Browse Source

合并serve命令和mount命令;优化模块启动方式

feature_gxh
Sydonian 7 months ago
parent
commit
fb06f47441
22 changed files with 539 additions and 367 deletions
  1. +29
    -7
      client/internal/accessstat/access_stat.go
  2. +0
    -123
      client/internal/cmdline/mount.go
  3. +112
    -88
      client/internal/cmdline/serve.go
  4. +4
    -5
      client/internal/config/config.go
  5. +2
    -2
      client/internal/downloader/downloader.go
  6. +28
    -11
      client/internal/http/aws_auth.go
  7. +12
    -0
      client/internal/http/config.go
  8. +41
    -0
      client/internal/http/mount.go
  9. +4
    -5
      client/internal/http/object.go
  10. +4
    -5
      client/internal/http/presigned.go
  11. +101
    -63
      client/internal/http/server.go
  12. +1
    -0
      client/internal/mount/config/config.go
  13. +21
    -0
      client/internal/mount/mount.go
  14. +46
    -29
      client/internal/mount/mount_linux.go
  15. +15
    -21
      client/internal/mount/mount_win.go
  16. +24
    -0
      client/internal/mount/vfs/cache/cache.go
  17. +12
    -0
      client/internal/mount/vfs/cache/status.go
  18. +4
    -0
      client/internal/mount/vfs/vfs.go
  19. +6
    -2
      client/internal/services/service.go
  20. +38
    -0
      client/sdk/api/mount.go
  21. +21
    -4
      common/assets/confs/client.config.json
  22. +14
    -2
      common/pkgs/sysevent/publisher.go

+ 29
- 7
client/internal/accessstat/access_stat.go View File

@@ -4,16 +4,26 @@ import (
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type AccessStatEvent interface{}
type AccessStatEventChan = async.UnboundChannel[AccessStatEvent]

type AccessStatEvent interface {
IsAccessStatEvent() bool
}

type ExitEvent struct {
AccessStatEvent
Err error
}

type AccessStat struct {
cfg Config
done chan any
stats []db.AddAccessStatEntry
lock sync.Mutex
db *db.DB
@@ -21,8 +31,9 @@ type AccessStat struct {

func NewAccessStat(cfg Config, db *db.DB) *AccessStat {
return &AccessStat{
cfg: cfg,
db: db,
cfg: cfg,
done: make(chan any),
db: db,
}
}

@@ -38,13 +49,20 @@ func (p *AccessStat) AddAccessCounter(objID clitypes.ObjectID, pkgID clitypes.Pa
})
}

func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] {
ch := sync2.NewUnboundChannel[AccessStatEvent]()
func (p *AccessStat) Start() *AccessStatEventChan {
ch := async.NewUnboundChannel[AccessStatEvent]()

go func() {
ticker := time.NewTicker(p.cfg.ReportInterval)
defer ticker.Stop()

for {
<-ticker.C
select {
case <-ticker.C:
case <-p.done:
ch.Send(ExitEvent{})
return
}

p.lock.Lock()
st := p.stats
@@ -68,3 +86,7 @@ func (p *AccessStat) Start() *sync2.UnboundChannel[AccessStatEvent] {
}()
return ch
}

func (p *AccessStat) Stop() {
close(p.done)
}

+ 0
- 123
client/internal/cmdline/mount.go View File

@@ -1,123 +0,0 @@
package cmdline

import (
"fmt"
"os"
"time"

"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage2/client/internal/config"
db2 "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/internal/metacache"
"gitlink.org.cn/cloudream/storage2/client/internal/mount"
mntcfg "gitlink.org.cn/cloudream/storage2/client/internal/mount/config"
"gitlink.org.cn/cloudream/storage2/client/internal/uploader"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
"gitlink.org.cn/cloudream/storage2/common/pkgs/distlock"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
hubpool "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool"
)

func init() {
var configPath string

cmd := &cobra.Command{
Use: "mount",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
mountCmd(args[0], configPath)
},
}
cmd.Flags().StringVarP(&configPath, "config", "c", "", "path to config file")

RootCmd.AddCommand(cmd)
}

func mountCmd(mountPoint string, configPath string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}

err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}

stgglb.InitLocal(config.Cfg().Local)
stgglb.InitMQPool(config.Cfg().RabbitMQ)
stgglb.InitHubRPCPool(&hubrpc.PoolConfig{})
// stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID)
// stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID)

// 初始化存储服务管理器
stgPool := hubpool.NewPool()

db, err := db2.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db failed, err: %s", err.Error())
}

// 启动网络连通性检测,并就地检测一次
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
// conCol.CollectInPlace()

// 初始化元数据缓存服务
metacacheHost := metacache.NewHost(db)
go metacacheHost.Serve()
stgMeta := metacacheHost.AddStorageMeta()
hubMeta := metacacheHost.AddHubMeta()
conMeta := metacacheHost.AddConnectivity()

// 分布式锁
distlockSvc, err := distlock.NewService(&config.Cfg().DistLock)
if err != nil {
logger.Warnf("new distlock service failed, err: %s", err.Error())
os.Exit(1)
}
go serveDistLock(distlockSvc)

// 初始化下载策略选择器
strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta)

// 初始化下载器
dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgPool, strgSel, db)

// 上传器
uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db)

mnt := mount.NewMount(&mntcfg.Config{
DataDir: "./cache/data",
MetaDir: "./cache/meta",
MountPoint: mountPoint,
AttrTimeout: time.Second * 5,
UploadPendingTime: time.Second * 10,
CacheActiveTime: time.Second * 10,
CacheExpireTime: time.Second * 60,
ScanDataDirInterval: 30 * time.Second,
}, db, uploader, &dlder)

ch := mnt.Start()
for {
evt, err := ch.Receive()
if err != nil {
break
}

switch e := evt.(type) {
case mount.MountingFailedEvent:
fmt.Println("mounting failed:", e.Err)
return

case mount.MountExitEvent:
fmt.Printf("mount exit\n")
return
}
}
}

+ 112
- 88
client/internal/cmdline/serve.go View File

@@ -1,7 +1,6 @@
package cmdline

import (
"context"
"fmt"
"os"
"time"
@@ -15,6 +14,7 @@ import (
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/internal/http"
"gitlink.org.cn/cloudream/storage2/client/internal/metacache"
"gitlink.org.cn/cloudream/storage2/client/internal/mount"
"gitlink.org.cn/cloudream/storage2/client/internal/services"
"gitlink.org.cn/cloudream/storage2/client/internal/uploader"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
@@ -27,20 +27,31 @@ import (

// 初始化函数,将ServeHTTP命令注册到命令列表中。
func init() {
var configPath, listenAddr string
var configPath string
var opt serveHTTPOptions
cmd := cobra.Command{
Use: "serve",
Short: "start serving storage service",
Run: func(cmd *cobra.Command, args []string) {
serveHTTP(configPath, listenAddr)
serveHTTP(configPath, opt)
},
}
cmd.Flags().StringVarP(&configPath, "config", "c", "", "config file path")
cmd.Flags().StringVarP(&listenAddr, "listen", "l", "", "listen address")
cmd.Flags().BoolVarP(&opt.DisableHTTP, "no-http", "", false, "disable http server")
cmd.Flags().StringVarP(&opt.HTTPListenAddr, "listen", "", "", "http listen address, will override config file")
cmd.Flags().BoolVarP(&opt.DisableMount, "no-mount", "", false, "disable mount")
cmd.Flags().StringVarP(&opt.MountPoint, "mount", "", "", "mount point, will override config file")
RootCmd.AddCommand(&cmd)
}

func serveHTTP(configPath string, listenAddr string) {
type serveHTTPOptions struct {
DisableHTTP bool
HTTPListenAddr string
DisableMount bool
MountPoint string
}

func serveHTTP(configPath string, opts serveHTTPOptions) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
@@ -71,7 +82,8 @@ func serveHTTP(configPath string, listenAddr string) {
logger.Errorf("new sysevent publisher: %v", err)
os.Exit(1)
}
go servePublisher(evtPub)
evtPubChan := evtPub.Start()
defer evtPub.Stop()

// 连接性信息收集
conCol := connectivity.NewCollector(&config.Cfg().Connectivity, nil)
@@ -97,7 +109,8 @@ func serveHTTP(configPath string, listenAddr string) {
// TODO 考虑放到配置里
ReportInterval: time.Second * 10,
}, db)
go serveAccessStat(acStat)
acStatChan := acStat.Start()
defer acStat.Stop()

// 存储管理器
stgPool := pool.NewPool()
@@ -111,107 +124,118 @@ func serveHTTP(configPath string, listenAddr string) {
// 上传器
uploader := uploader.NewUploader(distlockSvc, &conCol, stgPool, stgMeta, db)

svc, err := services.NewService(distlockSvc, &dlder, acStat, uploader, strgSel, stgMeta, db, evtPub)
if err != nil {
logger.Warnf("new services failed, err: %s", err.Error())
os.Exit(1)
}
svc := services.NewService(distlockSvc, dlder, acStat, uploader, strgSel, stgMeta, db, evtPub, nil)

// 默认监听地址为":7890",如果提供了命令行参数,则使用参数指定的地址。
if listenAddr == "" {
listenAddr = ":7890"
// HTTP接口
httpCfg := config.Cfg().HTTP
if !opts.DisableHTTP && httpCfg != nil && httpCfg.Enabled {
if opts.HTTPListenAddr != "" {
httpCfg.Listen = opts.HTTPListenAddr
}
} else {
httpCfg = nil
}

awsAuth, err := http.NewAWSAuth(config.Cfg().AuthAccessKey, config.Cfg().AuthSecretKey)
if err != nil {
logger.Warnf("new aws auth: %v", err)
os.Exit(1)
httpSvr := http.NewServer(httpCfg, svc)
httpChan := httpSvr.Start()
defer httpSvr.Stop()

// 挂载
mntCfg := config.Cfg().Mount
if !opts.DisableMount && mntCfg != nil && mntCfg.Enabled {
if opts.MountPoint != "" {
mntCfg.MountPoint = opts.MountPoint
}
} else {
mntCfg = nil
}
mnt := mount.NewMount(mntCfg, db, uploader, dlder)
mntChan := mnt.Start()
defer mnt.Stop()

// 创建一个新的HTTP服务器实例。
httpSvr, err := http.NewServer(listenAddr, svc, awsAuth)
if err != nil {
logger.Warnf("new http server: %v", err)
os.Exit(1)
}
/// 开始监听各个模块的事件

// 启动HTTP服务。
err = httpSvr.Serve()
if err != nil {
logger.Warnf("serving http: %v", err)
os.Exit(1)
}
}
evtPubEvt := evtPubChan.Receive()
acStatEvt := acStatChan.Receive()
httpEvt := httpChan.Receive()
mntEvt := mntChan.Receive()

func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")
loop:
for {
select {
case e := <-evtPubEvt.Chan():
if e.Err != nil {
logger.Errorf("receive publisher event: %v", err)
break loop
}

err := svc.Serve()
switch val := e.Value.(type) {
case sysevent.PublishError:
logger.Errorf("publishing event: %v", val)

if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}
case sysevent.PublisherExited:
if val.Err != nil {
logger.Errorf("publisher exited with error: %v", val.Err)
} else {
logger.Info("publisher exited")
}
break loop

logger.Info("distlock stopped")
case sysevent.OtherError:
logger.Errorf("sysevent: %v", val)
}
evtPubEvt = evtPubChan.Receive()

// TODO 仅简单结束了程序
os.Exit(1)
}
case e := <-acStatEvt.Chan():
if e.Err != nil {
logger.Errorf("receive access stat event: %v", err)
break loop
}
switch e := e.Value.(type) {
case accessstat.ExitEvent:
logger.Infof("access stat exited, err: %v", e.Err)
break loop
}
acStatEvt = acStatChan.Receive()

func serveAccessStat(svc *accessstat.AccessStat) {
logger.Info("start serving access stat")
case e := <-httpEvt.Chan():
if e.Err != nil {
logger.Errorf("receive http event: %v", err)
break loop
}

ch := svc.Start()
loop:
for {
val, err := ch.Receive()
if err != nil {
logger.Errorf("access stat stopped with error: %v", err)
break
}
switch e := e.Value.(type) {
case http.ExitEvent:
logger.Infof("http server exited, err: %v", e.Err)
break loop
}
httpEvt = httpChan.Receive()

switch val := val.(type) {
case error:
logger.Errorf("access stat stopped with error: %v", val)
break loop
case e := <-mntEvt.Chan():
if e.Err != nil {
logger.Errorf("receive mount event: %v", e.Err)
break loop
}

switch e := e.Value.(type) {
case mount.ExitEvent:
logger.Infof("mount exited, err: %v", e.Err)
break loop
}
mntEvt = mntChan.Receive()
}
}
logger.Info("access stat stopped")

// TODO 仅简单结束了程序
os.Exit(1)
}

func servePublisher(evtPub *sysevent.Publisher) {
logger.Info("start serving sysevent publisher")

ch := evtPub.Start()

loop:
for {
val, err := ch.Receive().Wait(context.Background())
if err != nil {
logger.Errorf("sysevent publisher stopped with error: %s", err.Error())
break
}

switch val := val.(type) {
case sysevent.PublishError:
logger.Errorf("publishing event: %v", val)
func serveDistLock(svc *distlock.Service) {
logger.Info("start serving distlock")

case sysevent.PublisherExited:
if val.Err != nil {
logger.Errorf("publisher exited with error: %v", val.Err)
} else {
logger.Info("publisher exited")
}
break loop
err := svc.Serve()

case sysevent.OtherError:
logger.Errorf("sysevent: %v", val)
}
if err != nil {
logger.Errorf("distlock stopped with error: %s", err.Error())
}
logger.Info("sysevent publisher stopped")

logger.Info("distlock stopped")

// TODO 仅简单结束了程序
os.Exit(1)


+ 4
- 5
client/internal/config/config.go View File

@@ -8,7 +8,8 @@ import (
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
"gitlink.org.cn/cloudream/storage2/client/internal/http"
mntcfg "gitlink.org.cn/cloudream/storage2/client/internal/mount/config"
stgglb "gitlink.org.cn/cloudream/storage2/common/globals"
"gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity"
hubrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/hub"
@@ -24,10 +25,8 @@ type Config struct {
Connectivity connectivity.Config `json:"connectivity"`
Downloader downloader.Config `json:"downloader"`
DownloadStrategy strategy.Config `json:"downloadStrategy"`
UserSpaceID clitypes.UserSpaceID `json:"userSpaceID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。
AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法
AuthSecretKey string `json:"authSecretKey"`
MaxHTTPBodySize int64 `json:"maxHttpBodySize"`
HTTP *http.Config `json:"http"`
Mount *mntcfg.Config `json:"mount"`
}

var cfg Config


+ 2
- 2
client/internal/downloader/downloader.go View File

@@ -45,13 +45,13 @@ type Downloader struct {
db *db.DB
}

func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB) Downloader {
func NewDownloader(cfg Config, conn *connectivity.Collector, stgPool *pool.Pool, sel *strategy.Selector, db *db.DB) *Downloader {
if cfg.MaxStripCacheCount == 0 {
cfg.MaxStripCacheCount = DefaultMaxStripCacheCount
}

ch, _ := lru.New[ECStripKey, ObjectECStrip](cfg.MaxStripCacheCount)
return Downloader{
return &Downloader{
strips: ch,
cfg: cfg,
conn: conn,


+ 28
- 11
client/internal/http/aws_auth.go View File

@@ -18,7 +18,6 @@ import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage2/client/internal/config"
)

const (
@@ -28,24 +27,32 @@ const (
)

type AWSAuth struct {
cfg *Config
cred aws.Credentials
signer *v4.Signer
}

func NewAWSAuth(accessKey string, secretKey string) (*AWSAuth, error) {
prod := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "")
cred, err := prod.Retrieve(context.TODO())
if err != nil {
return nil, err
func NewAWSAuth(cfg *Config) *AWSAuth {
auth := &AWSAuth{
cfg: cfg,
}

if cfg.AuthAccessKey != "" && cfg.AuthSecretKey != "" {
prod := credentials.NewStaticCredentialsProvider(cfg.AuthAccessKey, cfg.AuthSecretKey, "")
cred, _ := prod.Retrieve(context.TODO())
auth.cred = cred
auth.signer = v4.NewSigner()
}

return &AWSAuth{
cred: cred,
signer: v4.NewSigner(),
}, nil
return auth
}

func (a *AWSAuth) Auth(c *gin.Context) {
if a.signer == nil {
c.Next()
return
}

authorizationHeader := c.GetHeader(AuthorizationHeader)
if authorizationHeader == "" {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.Unauthorized, "authorization header is missing"))
@@ -59,7 +66,7 @@ func (a *AWSAuth) Auth(c *gin.Context) {
}

// 限制请求体大小
rd := io.LimitReader(c.Request.Body, config.Cfg().MaxHTTPBodySize)
rd := io.LimitReader(c.Request.Body, a.cfg.MaxBodySize)
body, err := io.ReadAll(rd)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "read request body failed"))
@@ -117,6 +124,11 @@ func (a *AWSAuth) Auth(c *gin.Context) {
}

func (a *AWSAuth) AuthWithoutBody(c *gin.Context) {
if a.signer == nil {
c.Next()
return
}

authorizationHeader := c.GetHeader(AuthorizationHeader)
if authorizationHeader == "" {
c.AbortWithStatusJSON(http.StatusBadRequest, Failed(errorcode.Unauthorized, "authorization header is missing"))
@@ -175,6 +187,11 @@ func (a *AWSAuth) AuthWithoutBody(c *gin.Context) {
}

func (a *AWSAuth) PresignedAuth(c *gin.Context) {
if a.signer == nil {
c.Next()
return
}

query := c.Request.URL.Query()

signature := query.Get("X-Amz-Signature")


+ 12
- 0
client/internal/http/config.go View File

@@ -0,0 +1,12 @@
package http

import "gitlink.org.cn/cloudream/storage2/client/types"

type Config struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
AuthAccessKey string `json:"authAccessKey"` // TODO 临时办法
AuthSecretKey string `json:"authSecretKey"`
MaxBodySize int64 `json:"maxBodySize"`
UserSpaceID types.UserSpaceID `json:"userSpaceID"` // TODO 进行访问量统计时,当前客户端所属的存储ID。临时解决方案。
}

+ 41
- 0
client/internal/http/mount.go View File

@@ -0,0 +1,41 @@
package http

import (
"net/http"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api"
)

type MountService struct {
*Server
}

func (s *Server) Mount() *MountService {
return &MountService{
Server: s,
}
}

func (m *MountService) DumpStatus(ctx *gin.Context) {
log := logger.WithField("HTTP", "Object.ListByPath")

var req cliapi.MountDumpStatus
if err := ctx.ShouldBindQuery(&req); err != nil {
log.Warnf("binding body: %s", err.Error())
ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument"))
return
}

if m.svc.Mount == nil {
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "mount service not initialized"))
return
}

dumpStatus := m.svc.Mount.Dump()
ctx.JSON(http.StatusOK, cliapi.MountDumpStatusPathResp{
MountStatus: dumpStatus,
})
}

+ 4
- 5
client/internal/http/object.go View File

@@ -13,7 +13,6 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage2/client/internal/config"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api"
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
@@ -176,8 +175,8 @@ func (s *ObjectService) Download(ctx *gin.Context) {
}

// TODO 当client不在某个代理节点上时如何处理?
if config.Cfg().UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
if s.cfg.UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, s.cfg.UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
}
}

@@ -233,8 +232,8 @@ func (s *ObjectService) DownloadByPath(ctx *gin.Context) {
log.Warnf("copying file: %s", err.Error())
}

if config.Cfg().UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
if s.cfg.UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, s.cfg.UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
}
}



+ 4
- 5
client/internal/http/presigned.go View File

@@ -12,7 +12,6 @@ import (
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/math2"
"gitlink.org.cn/cloudream/storage2/client/internal/config"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api"
)
@@ -98,8 +97,8 @@ func (s *PresignedService) ObjectDownloadByPath(ctx *gin.Context) {
log.Warnf("copying file: %s", err.Error())
}

if config.Cfg().UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
if s.cfg.UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, s.cfg.UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
}
}

@@ -140,8 +139,8 @@ func (s *PresignedService) ObjectDownload(ctx *gin.Context) {
log.Warnf("copying file: %s", err.Error())
}

if config.Cfg().UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, config.Cfg().UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
if s.cfg.UserSpaceID > 0 {
s.svc.AccessStat.AddAccessCounter(file.Object.ObjectID, file.Object.PackageID, s.cfg.UserSpaceID, math2.DivOrDefault(float64(n), float64(file.Object.Size), 1))
}
}



+ 101
- 63
client/internal/http/server.go View File

@@ -1,51 +1,84 @@
package http

import (
"context"
"net/http"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/storage2/client/internal/services"
cliapi "gitlink.org.cn/cloudream/storage2/client/sdk/api"
)

type Server struct {
engine *gin.Engine
listenAddr string
svc *services.Service
awsAuth *AWSAuth
type ServerEventChan = async.UnboundChannel[ServerEvent]

type ServerEvent interface {
IsServerEvent() bool
}

func NewServer(listenAddr string, svc *services.Service, awsAuth *AWSAuth) (*Server, error) {
engine := gin.New()
type ExitEvent struct {
ServerEvent
Err error
}

type Server struct {
cfg *Config
httpSrv *http.Server
svc *services.Service
eventChan *ServerEventChan
}

func NewServer(cfg *Config, svc *services.Service) *Server {
return &Server{
engine: engine,
listenAddr: listenAddr,
svc: svc,
awsAuth: awsAuth,
}, nil
svc: svc,
eventChan: async.NewUnboundChannel[ServerEvent](),
}
}

func (s *Server) Serve() error {
s.initRouters()
func (s *Server) Start() *ServerEventChan {
go func() {
if s.cfg == nil {
return
}

engine := gin.New()
s.httpSrv = &http.Server{
Addr: s.cfg.Listen,
Handler: engine,
}

s.initRouters(engine)

logger.Infof("start serving http at: %s", s.cfg.Listen)

logger.Infof("start serving http at: %s", s.listenAddr)
err := s.engine.Run(s.listenAddr)
err := s.httpSrv.ListenAndServe()
if err != nil {
logger.Infof("http stopped with error: %s", err.Error())
} else {
logger.Infof("http stopped")
}

if err != nil {
logger.Infof("http stopped with error: %s", err.Error())
return err
s.eventChan.Send(ExitEvent{Err: err})
}()
return s.eventChan
}

func (s *Server) Stop() {
if s.httpSrv == nil {
s.eventChan.Send(ExitEvent{})
return
}

logger.Infof("http stopped")
return nil
s.httpSrv.Shutdown(context.Background())
}

func (s *Server) initRouters() {
rt := s.engine.Use()
func (s *Server) initRouters(engine *gin.Engine) {
rt := engine.Use()

// initTemp(rt, s)

s.routeV1(s.engine, rt)
s.routeV1(engine, rt)

rt.GET(cliapi.ObjectListPathByPath, s.Object().ListByPath)
rt.GET(cliapi.ObjectListByIDsPath, s.Object().ListByIDs)
@@ -84,49 +117,54 @@ func (s *Server) initRouters() {
func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) {
v1 := eg.Group("/v1")

v1.GET(cliapi.ObjectListPathByPath, s.awsAuth.Auth, s.Object().ListByPath)
v1.GET(cliapi.ObjectListByIDsPath, s.awsAuth.Auth, s.Object().ListByIDs)
v1.GET(cliapi.ObjectDownloadPath, s.awsAuth.Auth, s.Object().Download)
v1.GET(cliapi.ObjectDownloadByPathPath, s.awsAuth.Auth, s.Object().DownloadByPath)
v1.POST(cliapi.ObjectUploadPath, s.awsAuth.AuthWithoutBody, s.Object().Upload)
v1.GET(cliapi.ObjectGetPackageObjectsPath, s.awsAuth.Auth, s.Object().GetPackageObjects)
v1.POST(cliapi.ObjectUpdateInfoPath, s.awsAuth.Auth, s.Object().UpdateInfo)
v1.POST(cliapi.ObjectUpdateInfoByPathPath, s.awsAuth.Auth, s.Object().UpdateInfoByPath)
v1.POST(cliapi.ObjectMovePath, s.awsAuth.Auth, s.Object().Move)
v1.POST(cliapi.ObjectDeletePath, s.awsAuth.Auth, s.Object().Delete)
v1.POST(cliapi.ObjectDeleteByPathPath, s.awsAuth.Auth, s.Object().DeleteByPath)
v1.POST(cliapi.ObjectClonePath, s.awsAuth.Auth, s.Object().Clone)

v1.GET(cliapi.PackageGetPath, s.awsAuth.Auth, s.Package().Get)
v1.GET(cliapi.PackageGetByFullNamePath, s.awsAuth.Auth, s.Package().GetByFullName)
v1.POST(cliapi.PackageCreatePath, s.awsAuth.Auth, s.Package().Create)
v1.POST(cliapi.PackageCreateLoadPath, s.awsAuth.Auth, s.Package().CreateLoad)
v1.POST(cliapi.PackageDeletePath, s.awsAuth.Auth, s.Package().Delete)
v1.POST(cliapi.PackageClonePath, s.awsAuth.Auth, s.Package().Clone)
v1.GET(cliapi.PackageListBucketPackagesPath, s.awsAuth.Auth, s.Package().ListBucketPackages)
// v1.GET(cdsapi.PackageGetCachedStoragesPath, s.awsAuth.Auth, s.Package().GetCachedStorages)

v1.POST(cliapi.UserSpaceLoadPackagePath, s.awsAuth.Auth, s.UserSpace().LoadPackage)
v1.POST(cliapi.UserSpaceCreatePackagePath, s.awsAuth.Auth, s.UserSpace().CreatePackage)
v1.GET(cliapi.UserSpaceGetPath, s.awsAuth.Auth, s.UserSpace().Get)

// v1.POST(cdsapi.CacheMovePackagePath, s.awsAuth.Auth, s.Cache().MovePackage)

v1.GET(cliapi.BucketGetByNamePath, s.awsAuth.Auth, s.Bucket().GetByName)
v1.POST(cliapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create)
v1.POST(cliapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete)
v1.GET(cliapi.BucketListAllPath, s.awsAuth.Auth, s.Bucket().ListAll)
awsAuth := NewAWSAuth(s.cfg)

v1.GET(cliapi.ObjectListPathByPath, awsAuth.Auth, s.Object().ListByPath)
v1.GET(cliapi.ObjectListByIDsPath, awsAuth.Auth, s.Object().ListByIDs)
v1.GET(cliapi.ObjectDownloadPath, awsAuth.Auth, s.Object().Download)
v1.GET(cliapi.ObjectDownloadByPathPath, awsAuth.Auth, s.Object().DownloadByPath)
v1.POST(cliapi.ObjectUploadPath, awsAuth.AuthWithoutBody, s.Object().Upload)
v1.GET(cliapi.ObjectGetPackageObjectsPath, awsAuth.Auth, s.Object().GetPackageObjects)
v1.POST(cliapi.ObjectUpdateInfoPath, awsAuth.Auth, s.Object().UpdateInfo)
v1.POST(cliapi.ObjectUpdateInfoByPathPath, awsAuth.Auth, s.Object().UpdateInfoByPath)
v1.POST(cliapi.ObjectMovePath, awsAuth.Auth, s.Object().Move)
v1.POST(cliapi.ObjectDeletePath, awsAuth.Auth, s.Object().Delete)
v1.POST(cliapi.ObjectDeleteByPathPath, awsAuth.Auth, s.Object().DeleteByPath)
v1.POST(cliapi.ObjectClonePath, awsAuth.Auth, s.Object().Clone)

v1.GET(cliapi.PackageGetPath, awsAuth.Auth, s.Package().Get)
v1.GET(cliapi.PackageGetByFullNamePath, awsAuth.Auth, s.Package().GetByFullName)
v1.POST(cliapi.PackageCreatePath, awsAuth.Auth, s.Package().Create)
v1.POST(cliapi.PackageCreateLoadPath, awsAuth.Auth, s.Package().CreateLoad)
v1.POST(cliapi.PackageDeletePath, awsAuth.Auth, s.Package().Delete)
v1.POST(cliapi.PackageClonePath, awsAuth.Auth, s.Package().Clone)
v1.GET(cliapi.PackageListBucketPackagesPath, awsAuth.Auth, s.Package().ListBucketPackages)
// v1.GET(cdsapi.PackageGetCachedStoragesPath, awsAuth.Auth, s.Package().GetCachedStorages)

v1.POST(cliapi.UserSpaceLoadPackagePath, awsAuth.Auth, s.UserSpace().LoadPackage)
v1.POST(cliapi.UserSpaceCreatePackagePath, awsAuth.Auth, s.UserSpace().CreatePackage)
v1.GET(cliapi.UserSpaceGetPath, awsAuth.Auth, s.UserSpace().Get)

// v1.POST(cdsapi.CacheMovePackagePath, awsAuth.Auth, s.Cache().MovePackage)

v1.GET(cliapi.BucketGetByNamePath, awsAuth.Auth, s.Bucket().GetByName)
v1.POST(cliapi.BucketCreatePath, awsAuth.Auth, s.Bucket().Create)
v1.POST(cliapi.BucketDeletePath, awsAuth.Auth, s.Bucket().Delete)
v1.GET(cliapi.BucketListAllPath, awsAuth.Auth, s.Bucket().ListAll)

rt.POST(cliapi.ObjectNewMultipartUploadPath, s.Object().NewMultipartUpload)
rt.POST(cliapi.ObjectUploadPartPath, s.Object().UploadPart)
rt.POST(cliapi.ObjectCompleteMultipartUploadPath, s.Object().CompleteMultipartUpload)

rt.GET(cliapi.PresignedObjectListByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectListByPath)
rt.GET(cliapi.PresignedObjectDownloadByPathPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath)
rt.GET(cliapi.PresignedObjectDownloadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectDownload)
rt.POST(cliapi.PresignedObjectUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUpload)
rt.GET(cliapi.PresignedObjectListByPathPath, awsAuth.PresignedAuth, s.Presigned().ObjectListByPath)
rt.GET(cliapi.PresignedObjectDownloadByPathPath, awsAuth.PresignedAuth, s.Presigned().ObjectDownloadByPath)
rt.GET(cliapi.PresignedObjectDownloadPath, awsAuth.PresignedAuth, s.Presigned().ObjectDownload)
rt.POST(cliapi.PresignedObjectUploadPath, awsAuth.PresignedAuth, s.Presigned().ObjectUpload)

rt.POST(cliapi.PresignedObjectNewMultipartUploadPath, awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload)
rt.POST(cliapi.PresignedObjectUploadPartPath, awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart)
rt.POST(cliapi.PresignedObjectCompleteMultipartUploadPath, awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload)

rt.POST(cliapi.PresignedObjectNewMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectNewMultipartUpload)
rt.POST(cliapi.PresignedObjectUploadPartPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectUploadPart)
rt.POST(cliapi.PresignedObjectCompleteMultipartUploadPath, s.awsAuth.PresignedAuth, s.Presigned().ObjectCompleteMultipartUpload)
// 不需要鉴权
rt.GET(cliapi.MountDumpStatusPath, s.Mount().DumpStatus)
}

+ 1
- 0
client/internal/mount/config/config.go View File

@@ -3,6 +3,7 @@ package config
import "time"

type Config struct {
Enabled bool `json:"enabled"`
MountPoint string `json:"mountPoint"`
GID uint32 `json:"gid"`
UID uint32 `json:"uid"`


+ 21
- 0
client/internal/mount/mount.go View File

@@ -0,0 +1,21 @@
package mount

import (
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/vfs/cache"
)

type MountEventChan = async.UnboundChannel[MountEvent]

type MountEvent interface {
IsMountEvent() bool
}

type ExitEvent struct {
MountEvent
Err error
}

type MountStatus struct {
Cache cache.CacheStatus `json:"cache"`
}

+ 46
- 29
client/internal/mount/mount_linux.go View File

@@ -5,7 +5,7 @@ package mount
import (
fusefs "github.com/hanwen/go-fuse/v2/fs"
"github.com/hanwen/go-fuse/v2/fuse"
"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/common/pkgs/async"
db2 "gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/config"
@@ -15,40 +15,35 @@ import (
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type MountEvent interface {
IsMountEvent() bool
}

type MountExitEvent struct {
MountEvent
}

type MountingFailedEvent struct {
MountEvent
Err error
}

type Mount struct {
cfg *config.Config
vfs *vfs.Vfs
fuse *fuse2.Fuse
cfg *config.Config
vfs *vfs.Vfs
fuse *fuse2.Fuse
fuseServer *fuse.Server
eventChan *MountEventChan
}

func NewMount(cfg *config.Config, db *db2.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount {
vfs := vfs.NewVfs(cfg, db, uploader, downloader)
fuse := fuse2.NewFuse(cfg, vfs)
mnt := &Mount{
cfg: cfg,
eventChan: async.NewUnboundChannel[MountEvent](),
}

return &Mount{
cfg: cfg,
vfs: vfs,
fuse: fuse,
if cfg != nil {
mnt.vfs = vfs.NewVfs(cfg, db, uploader, downloader)
mnt.fuse = fuse2.NewFuse(cfg, mnt.vfs)
}

return mnt
}

func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
ch := sync2.NewUnboundChannel[MountEvent]()
m.vfs.Start()
func (m *Mount) Start() *MountEventChan {
if m.cfg == nil {
return m.eventChan
}

go func() {
m.vfs.Start()
defer m.vfs.Stop()

nodeFsOpt := &fusefs.Options{
@@ -61,14 +56,36 @@ func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {

svr, err := fuse.NewServer(rawFs, m.cfg.MountPoint, &nodeFsOpt.MountOptions)
if err != nil {
ch.Send(MountingFailedEvent{Err: err})
m.eventChan.Send(ExitEvent{Err: err})
return
}

m.fuseServer = svr

svr.Serve()
ch.Send(MountExitEvent{})
m.eventChan.Send(ExitEvent{})
}()
return ch
return m.eventChan
}

func (m *Mount) Stop() {
if m.fuseServer == nil {
m.eventChan.Send(ExitEvent{})
return
}

m.fuseServer.Unmount()
}

func (m *Mount) Dump() MountStatus {
if m.vfs == nil {
return MountStatus{}
}

cacheStatus := m.vfs.Dump()
return MountStatus{
Cache: cacheStatus,
}
}

func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) {


+ 15
- 21
client/internal/mount/mount_win.go View File

@@ -3,9 +3,7 @@
package mount

import (
"fmt"

"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/storage2/client/internal/db"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/mount/config"
@@ -13,32 +11,28 @@ import (
clitypes "gitlink.org.cn/cloudream/storage2/client/types"
)

type MountEvent interface {
IsMountEvent() bool
}

type MountExitEvent struct {
MountEvent
}

type MountingFailedEvent struct {
MountEvent
Err error
}

type Mount struct {
eventChan *MountEventChan
}

func NewMount(cfg *config.Config, db *db.DB, uploader *uploader.Uploader, downloader *downloader.Downloader) *Mount {
return &Mount{}
return &Mount{
eventChan: async.NewUnboundChannel[MountEvent](),
}
}

func (m *Mount) Start() *sync2.UnboundChannel[MountEvent] {
ch := sync2.NewUnboundChannel[MountEvent]()
func (m *Mount) Start() *MountEventChan {
go func() {
ch.Send(MountingFailedEvent{Err: fmt.Errorf("not implemented")})
}()
return ch
return m.eventChan
}

func (m *Mount) Stop() {
m.eventChan.Send(ExitEvent{})
}

func (m *Mount) Dump() MountStatus {
return MountStatus{}
}

func (m *Mount) NotifyObjectInvalid(obj clitypes.Object) {


+ 24
- 0
client/internal/mount/vfs/cache/cache.go View File

@@ -82,6 +82,30 @@ func (c *Cache) GetCacheMetaPath(comps ...string) string {
return filepath.Join(comps2...)
}

func (c *Cache) Dump() CacheStatus {
c.lock.RLock()
defer c.lock.RUnlock()

var activeFiles []CacheFileStatus
c.activeCache.Iterate(func(path []string, node *trie.Node[*CacheFile], isWordNode bool) trie.VisitCtrl {
if node.Value == nil {
return trie.VisitContinue
}

activeFiles = append(activeFiles, CacheFileStatus{
Path: filepath.Join(path...),
RefCount: node.Value.state.refCount,
IsLoaded: node.Value.state.isLoaded,
IsUploading: node.Value.state.uploading != nil,
})
return trie.VisitContinue
})

return CacheStatus{
ActiveFiles: activeFiles,
}
}

// 获取指定位置的缓存条目信息。如果路径不存在,则返回nil。
func (c *Cache) Stat(pathComps []string) *CacheEntryInfo {
c.lock.RLock()


+ 12
- 0
client/internal/mount/vfs/cache/status.go View File

@@ -0,0 +1,12 @@
package cache

type CacheStatus struct {
ActiveFiles []CacheFileStatus `json:"activeFiles"`
}

type CacheFileStatus struct {
Path string `json:"path"`
RefCount int `json:"refCount"`
IsLoaded bool `json:"isLoaded"`
IsUploading bool `json:"isUploading"`
}

+ 4
- 0
client/internal/mount/vfs/vfs.go View File

@@ -38,3 +38,7 @@ func (v *Vfs) Root() fuse.FsDir {
func (v *Vfs) Stats() fuse.FsStats {
return fuse.FsStats{}
}

func (v *Vfs) Dump() cache.CacheStatus {
return v.cache.Dump()
}

+ 6
- 2
client/internal/services/service.go View File

@@ -7,6 +7,7 @@ import (
"gitlink.org.cn/cloudream/storage2/client/internal/downloader"
"gitlink.org.cn/cloudream/storage2/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/storage2/client/internal/metacache"
"gitlink.org.cn/cloudream/storage2/client/internal/mount"
"gitlink.org.cn/cloudream/storage2/client/internal/uploader"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
)
@@ -21,6 +22,7 @@ type Service struct {
UserSpaceMeta *metacache.UserSpaceMeta
DB *db.DB
EvtPub *sysevent.Publisher
Mount *mount.Mount
}

func NewService(
@@ -32,7 +34,8 @@ func NewService(
userSpaceMeta *metacache.UserSpaceMeta,
db *db.DB,
evtPub *sysevent.Publisher,
) (*Service, error) {
mount *mount.Mount,
) *Service {
return &Service{
DistLock: distlock,
Downloader: downloader,
@@ -42,5 +45,6 @@ func NewService(
UserSpaceMeta: userSpaceMeta,
DB: db,
EvtPub: evtPub,
}, nil
Mount: mount,
}
}

+ 38
- 0
client/sdk/api/mount.go View File

@@ -0,0 +1,38 @@
package api

import (
"net/http"

"gitlink.org.cn/cloudream/common/sdks"
"gitlink.org.cn/cloudream/storage2/client/internal/mount"
)

type MountService struct {
*Client
}

func (c *Client) Mount() *MountService {
return &MountService{
Client: c,
}
}

const MountDumpStatusPath = "/mount/dumpStatus"

type MountDumpStatus struct{}

func (r *MountDumpStatus) MakeParam() *sdks.RequestParam {
return sdks.MakeQueryParam(http.MethodGet, MountDumpStatusPath, r)
}

type MountDumpStatusPathResp struct {
mount.MountStatus
}

func (r *MountDumpStatusPathResp) ParseResponse(resp *http.Response) error {
return sdks.ParseCodeDataJSONResponse(resp, r)
}

func (c *MountService) DumpStatus(req MountDumpStatus) (*MountDumpStatusPathResp, error) {
return JSONAPI(c.cfg, http.DefaultClient, &req, &MountDumpStatusPathResp{})
}

+ 21
- 4
common/assets/confs/client.config.json View File

@@ -46,8 +46,25 @@
"downloadStrategy": {
"highLatencyHub": 35
},
"userSpaceID": 0,
"authAccessKey": "",
"authSecretKey": "",
"maxHttpBodySize": 5242880
"http": {
"enabled": true,
"listen": "127.0.0.1:7890",
"userSpaceID": 0,
"authAccessKey": "",
"authSecretKey": "",
"maxBodySize": 5242880
},
"mount": {
"enabled": false,
"mountPoint": "",
"gid": 0,
"uid": 0,
"dataDir": "",
"metaDir": "",
"attrTimeout": "10s",
"uploadPendingTime": "30s",
"cacheActiveTime": "1m",
"cacheExpireTime": "1m",
"scanDataDirInterval": "10m"
}
}

+ 14
- 2
common/pkgs/sysevent/publisher.go View File

@@ -10,17 +10,24 @@ import (
"gitlink.org.cn/cloudream/storage2/common/models/datamap"
)

type PublisherEvent interface{}
type PublisherEventChan = async.UnboundChannel[PublisherEvent]

type PublisherEvent interface {
IsPublisherEvent() bool
}

type PublisherExited struct {
PublisherEvent
Err error
}

type PublishError struct {
PublisherEvent
Err error
}

type OtherError struct {
PublisherEvent
Err error
}

@@ -64,7 +71,7 @@ func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) {
return pub, nil
}

func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] {
func (p *Publisher) Start() *PublisherEventChan {
ch := async.NewUnboundChannel[PublisherEvent]()
go func() {
defer ch.Close()
@@ -103,6 +110,11 @@ func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] {
return ch
}

func (p *Publisher) Stop() {
p.channel.Close()
p.connection.Close()
}

// Publish 发布事件,会自动补齐必要信息
func (p *Publisher) Publish(eventBody datamap.SysEventBody) {
p.eventChan.Send(datamap.SysEvent{


Loading…
Cancel
Save