| @@ -5,30 +5,21 @@ import ( | |||||
| "fmt" | "fmt" | ||||
| "net" | "net" | ||||
| "os" | "os" | ||||
| "time" | |||||
| "github.com/go-co-op/gocron/v2" | "github.com/go-co-op/gocron/v2" | ||||
| "gitlink.org.cn/cloudream/storage2/agent/internal/http" | "gitlink.org.cn/cloudream/storage2/agent/internal/http" | ||||
| "gitlink.org.cn/cloudream/storage2/agent/internal/tickevent" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage2/agent/internal/config" | "gitlink.org.cn/cloudream/storage2/agent/internal/config" | ||||
| "gitlink.org.cn/cloudream/storage2/agent/internal/task" | |||||
| stgglb "gitlink.org.cn/cloudream/storage2/common/globals" | stgglb "gitlink.org.cn/cloudream/storage2/common/globals" | ||||
| stgmod "gitlink.org.cn/cloudream/storage2/common/models" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/accessstat" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" | |||||
| "gitlink.org.cn/cloudream/storage2/common/models/datamap" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" | "gitlink.org.cn/cloudream/storage2/common/pkgs/distlock" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" | |||||
| agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" | agtrpc "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/metacache" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" | "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" | |||||
| cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" | |||||
| "google.golang.org/grpc" | "google.golang.org/grpc" | ||||
| @@ -52,83 +43,75 @@ func serve(configPath string) { | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| stgglb.InitLocal(&config.Cfg().Local) | |||||
| stgglb.InitLocal(config.Cfg().Local) | |||||
| stgglb.InitMQPool(config.Cfg().RabbitMQ) | stgglb.InitMQPool(config.Cfg().RabbitMQ) | ||||
| stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) | stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) | ||||
| stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) | |||||
| stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) | |||||
| // stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) | |||||
| // stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) | |||||
| // 获取Hub配置 | // 获取Hub配置 | ||||
| hubCfg := downloadHubConfig() | hubCfg := downloadHubConfig() | ||||
| // 初始化存储服务管理器 | // 初始化存储服务管理器 | ||||
| stgAgts := agtpool.NewPool() | |||||
| for _, stg := range hubCfg.Storages { | |||||
| err := stgAgts.SetupAgent(stg) | |||||
| if err != nil { | |||||
| fmt.Printf("init storage %v: %v", stg.Storage.String(), err) | |||||
| os.Exit(1) | |||||
| } | |||||
| } | |||||
| stgPool := pool.NewPool() | |||||
| // 初始化执行器 | // 初始化执行器 | ||||
| worker := exec.NewWorker() | worker := exec.NewWorker() | ||||
| // 初始化HTTP服务 | // 初始化HTTP服务 | ||||
| httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgAgts)) | |||||
| httpSvr, err := http.NewServer(config.Cfg().ListenAddr, http.NewService(&worker, stgPool)) | |||||
| if err != nil { | if err != nil { | ||||
| logger.Fatalf("new http server failed, err: %s", err.Error()) | logger.Fatalf("new http server failed, err: %s", err.Error()) | ||||
| } | } | ||||
| go serveHTTP(httpSvr) | go serveHTTP(httpSvr) | ||||
| // 启动网络连通性检测,并就地检测一次 | // 启动网络连通性检测,并就地检测一次 | ||||
| conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { | |||||
| log := logger.WithField("Connectivity", "") | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| cons := collector.GetAll() | |||||
| hubCons := make([]cdssdk.HubConnectivity, 0, len(cons)) | |||||
| for _, con := range cons { | |||||
| var delay *float32 | |||||
| if con.Latency != nil { | |||||
| v := float32(con.Latency.Microseconds()) / 1000 | |||||
| delay = &v | |||||
| } | |||||
| hubCons = append(hubCons, cdssdk.HubConnectivity{ | |||||
| FromHubID: *stgglb.Local.HubID, | |||||
| ToHubID: con.ToHubID, | |||||
| Latency: delay, | |||||
| TestTime: con.TestTime, | |||||
| }) | |||||
| } | |||||
| _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons)) | |||||
| if err != nil { | |||||
| log.Warnf("update hub connectivities: %v", err) | |||||
| } | |||||
| }) | |||||
| conCol.CollectInPlace() | |||||
| // conCol := connectivity.NewCollector(&config.Cfg().Connectivity, func(collector *connectivity.Collector) { | |||||
| // log := logger.WithField("Connectivity", "") | |||||
| // coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| // if err != nil { | |||||
| // log.Warnf("acquire coordinator mq failed, err: %s", err.Error()) | |||||
| // return | |||||
| // } | |||||
| // defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| // cons := collector.GetAll() | |||||
| // hubCons := make([]cortypes.HubConnectivity, 0, len(cons)) | |||||
| // for _, con := range cons { | |||||
| // var delay *float32 | |||||
| // if con.Latency != nil { | |||||
| // v := float32(con.Latency.Microseconds()) / 1000 | |||||
| // delay = &v | |||||
| // } | |||||
| // hubCons = append(hubCons, cortypes.HubConnectivity{ | |||||
| // FromHubID: *stgglb.Local.HubID, | |||||
| // ToHubID: con.ToHubID, | |||||
| // Latency: delay, | |||||
| // TestTime: con.TestTime, | |||||
| // }) | |||||
| // } | |||||
| // _, err = coorCli.UpdateHubConnectivities(coormq.ReqUpdateHubConnectivities(hubCons)) | |||||
| // if err != nil { | |||||
| // log.Warnf("update hub connectivities: %v", err) | |||||
| // } | |||||
| // }) | |||||
| // conCol.CollectInPlace() | |||||
| // 初始化元数据缓存服务 | // 初始化元数据缓存服务 | ||||
| metacacheHost := metacache.NewHost() | |||||
| go metacacheHost.Serve() | |||||
| stgMeta := metacacheHost.AddStorageMeta() | |||||
| hubMeta := metacacheHost.AddHubMeta() | |||||
| conMeta := metacacheHost.AddConnectivity() | |||||
| // metacacheHost := metacache.NewHost() | |||||
| // go metacacheHost.Serve() | |||||
| // stgMeta := metacacheHost.AddStorageMeta() | |||||
| // hubMeta := metacacheHost.AddHubMeta() | |||||
| // conMeta := metacacheHost.AddConnectivity() | |||||
| // 启动访问统计服务 | // 启动访问统计服务 | ||||
| acStat := accessstat.NewAccessStat(accessstat.Config{ | |||||
| // TODO 考虑放到配置里 | |||||
| ReportInterval: time.Second * 10, | |||||
| }) | |||||
| go serveAccessStat(acStat) | |||||
| // acStat := accessstat.NewAccessStat(accessstat.Config{ | |||||
| // // TODO 考虑放到配置里 | |||||
| // ReportInterval: time.Second * 10, | |||||
| // }) | |||||
| // go serveAccessStat(acStat) | |||||
| // 初始化分布式锁服务 | // 初始化分布式锁服务 | ||||
| distlock, err := distlock.NewService(&config.Cfg().DistLock) | distlock, err := distlock.NewService(&config.Cfg().DistLock) | ||||
| @@ -136,20 +119,8 @@ func serve(configPath string) { | |||||
| logger.Fatalf("new ipfs failed, err: %s", err.Error()) | logger.Fatalf("new ipfs failed, err: %s", err.Error()) | ||||
| } | } | ||||
| // 初始化下载策略选择器 | |||||
| strgSel := strategy.NewSelector(config.Cfg().DownloadStrategy, stgMeta, hubMeta, conMeta) | |||||
| // 初始化下载器 | |||||
| dlder := downloader.NewDownloader(config.Cfg().Downloader, &conCol, stgAgts, strgSel) | |||||
| // 初始化上传器 | |||||
| uploader := uploader.NewUploader(distlock, &conCol, stgAgts, stgMeta) | |||||
| // 初始化任务管理器 | |||||
| taskMgr := task.NewManager(distlock, &conCol, &dlder, acStat, stgAgts, uploader) | |||||
| // 初始化系统事件发布器 | // 初始化系统事件发布器 | ||||
| evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceHub{ | |||||
| evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &datamap.SourceHub{ | |||||
| HubID: hubCfg.Hub.HubID, | HubID: hubCfg.Hub.HubID, | ||||
| HubName: hubCfg.Hub.Name, | HubName: hubCfg.Hub.Name, | ||||
| }) | }) | ||||
| @@ -160,13 +131,13 @@ func serve(configPath string) { | |||||
| go servePublisher(evtPub) | go servePublisher(evtPub) | ||||
| // 初始化定时任务执行器 | // 初始化定时任务执行器 | ||||
| sch := setupTickTask(stgAgts, evtPub) | |||||
| sch := setupTickTask(stgPool, evtPub) | |||||
| sch.Start() | sch.Start() | ||||
| defer sch.Shutdown() | defer sch.Shutdown() | ||||
| // 启动命令服务器 | // 启动命令服务器 | ||||
| // TODO 需要设计AgentID持久化机制 | // TODO 需要设计AgentID持久化机制 | ||||
| agtSvr, err := agtmq.NewServer(cmdsvc.NewService(&taskMgr, stgAgts, uploader), config.Cfg().ID, config.Cfg().RabbitMQ) | |||||
| agtSvr, err := agtmq.NewServer(cmdsvc.NewService(stgPool), config.Cfg().ID, config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| logger.Fatalf("new agent server failed, err: %s", err.Error()) | logger.Fatalf("new agent server failed, err: %s", err.Error()) | ||||
| } | } | ||||
| @@ -182,7 +153,7 @@ func serve(configPath string) { | |||||
| logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) | logger.Fatalf("listen on %s failed, err: %s", listenAddr, err.Error()) | ||||
| } | } | ||||
| s := grpc.NewServer() | s := grpc.NewServer() | ||||
| agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgAgts)) | |||||
| agtrpc.RegisterAgentServer(s, grpcsvc.NewService(&worker, stgPool)) | |||||
| go serveGRPC(s, lis) | go serveGRPC(s, lis) | ||||
| go serveDistLock(distlock) | go serveDistLock(distlock) | ||||
| @@ -199,7 +170,7 @@ func downloadHubConfig() coormq.GetHubConfigResp { | |||||
| } | } | ||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | defer stgglb.CoordinatorMQPool.Release(coorCli) | ||||
| cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cdssdk.HubID(config.Cfg().ID))) | |||||
| cfgResp, err := coorCli.GetHubConfig(coormq.ReqGetHubConfig(cortypes.HubID(config.Cfg().ID))) | |||||
| if err != nil { | if err != nil { | ||||
| logger.Errorf("getting hub config: %v", err) | logger.Errorf("getting hub config: %v", err) | ||||
| os.Exit(1) | os.Exit(1) | ||||
| @@ -243,30 +214,24 @@ loop: | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| func setupTickTask(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) gocron.Scheduler { | |||||
| func setupTickTask(agtPool *pool.Pool, evtPub *sysevent.Publisher) gocron.Scheduler { | |||||
| sch, err := gocron.NewScheduler() | sch, err := gocron.NewScheduler() | ||||
| if err != nil { | if err != nil { | ||||
| logger.Errorf("new cron scheduler: %s", err.Error()) | logger.Errorf("new cron scheduler: %s", err.Error()) | ||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| gocron.NewAtTime(0, 0, 0), | |||||
| )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) | |||||
| sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| gocron.NewAtTime(0, 0, 1), | |||||
| )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) | |||||
| // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| // gocron.NewAtTime(0, 0, 0), | |||||
| // )), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) | |||||
| sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| gocron.NewAtTime(0, 0, 2), | |||||
| )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) | |||||
| // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| // gocron.NewAtTime(0, 0, 1), | |||||
| // )), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) | |||||
| // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) | |||||
| // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) | |||||
| // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubStorageTransferStats, agtPool, evtPub)) | |||||
| // sch.NewJob(gocron.DailyJob(1, gocron.NewAtTimes( | |||||
| // gocron.NewAtTime(0, 0, 2), | |||||
| // )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) | |||||
| return sch | return sch | ||||
| } | } | ||||
| @@ -346,26 +311,26 @@ func serveDistLock(svc *distlock.Service) { | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| func serveAccessStat(svc *accessstat.AccessStat) { | |||||
| logger.Info("start serving access stat") | |||||
| ch := svc.Start() | |||||
| loop: | |||||
| for { | |||||
| val, err := ch.Receive() | |||||
| if err != nil { | |||||
| logger.Errorf("access stat stopped with error: %v", err) | |||||
| break | |||||
| } | |||||
| switch val := val.(type) { | |||||
| case error: | |||||
| logger.Errorf("access stat stopped with error: %v", val) | |||||
| break loop | |||||
| } | |||||
| } | |||||
| logger.Info("access stat stopped") | |||||
| // TODO 仅简单结束了程序 | |||||
| os.Exit(1) | |||||
| } | |||||
| // func serveAccessStat(svc *accessstat.AccessStat) { | |||||
| // logger.Info("start serving access stat") | |||||
| // ch := svc.Start() | |||||
| // loop: | |||||
| // for { | |||||
| // val, err := ch.Receive() | |||||
| // if err != nil { | |||||
| // logger.Errorf("access stat stopped with error: %v", err) | |||||
| // break | |||||
| // } | |||||
| // switch val := val.(type) { | |||||
| // case error: | |||||
| // logger.Errorf("access stat stopped with error: %v", val) | |||||
| // break loop | |||||
| // } | |||||
| // } | |||||
| // logger.Info("access stat stopped") | |||||
| // // TODO 仅简单结束了程序 | |||||
| // os.Exit(1) | |||||
| // } | |||||
| @@ -6,24 +6,20 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| c "gitlink.org.cn/cloudream/common/utils/config" | c "gitlink.org.cn/cloudream/common/utils/config" | ||||
| stgmodels "gitlink.org.cn/cloudream/storage2/common/models" | |||||
| stgglb "gitlink.org.cn/cloudream/storage2/common/globals" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" | "gitlink.org.cn/cloudream/storage2/common/pkgs/connectivity" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/downloader/strategy" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc" | "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc" | ||||
| ) | ) | ||||
| type Config struct { | type Config struct { | ||||
| ID cdssdk.HubID `json:"id"` | |||||
| ListenAddr string `json:"listenAddr"` | |||||
| Local stgmodels.LocalMachineInfo `json:"local"` | |||||
| GRPC *grpc.Config `json:"grpc"` | |||||
| Logger log.Config `json:"logger"` | |||||
| RabbitMQ mq.Config `json:"rabbitMQ"` | |||||
| DistLock distlock.Config `json:"distlock"` | |||||
| Connectivity connectivity.Config `json:"connectivity"` | |||||
| Downloader downloader.Config `json:"downloader"` | |||||
| DownloadStrategy strategy.Config `json:"downloadStrategy"` | |||||
| ID cdssdk.HubID `json:"id"` | |||||
| ListenAddr string `json:"listenAddr"` | |||||
| Local stgglb.LocalMachineInfo `json:"local"` | |||||
| GRPC *grpc.Config `json:"grpc"` | |||||
| Logger log.Config `json:"logger"` | |||||
| RabbitMQ mq.Config `json:"rabbitMQ"` | |||||
| DistLock distlock.Config `json:"distlock"` | |||||
| Connectivity connectivity.Config `json:"connectivity"` | |||||
| } | } | ||||
| var cfg Config | var cfg Config | ||||
| @@ -3,16 +3,16 @@ package grpc | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| agentserver "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" | agentserver "gitlink.org.cn/cloudream/storage2/common/pkgs/grpc/agent" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" | |||||
| ) | ) | ||||
| type Service struct { | type Service struct { | ||||
| agentserver.AgentServer | agentserver.AgentServer | ||||
| swWorker *exec.Worker | swWorker *exec.Worker | ||||
| stgAgts *agtpool.AgentPool | |||||
| stgAgts *pool.Pool | |||||
| } | } | ||||
| func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { | |||||
| func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { | |||||
| return &Service{ | return &Service{ | ||||
| swWorker: swWorker, | swWorker: swWorker, | ||||
| stgAgts: stgAgts, | stgAgts: stgAgts, | ||||
| @@ -2,15 +2,15 @@ package http | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" | |||||
| ) | ) | ||||
| type Service struct { | type Service struct { | ||||
| swWorker *exec.Worker | swWorker *exec.Worker | ||||
| stgAgts *agtpool.AgentPool | |||||
| stgAgts *pool.Pool | |||||
| } | } | ||||
| func NewService(swWorker *exec.Worker, stgAgts *agtpool.AgentPool) *Service { | |||||
| func NewService(swWorker *exec.Worker, stgAgts *pool.Pool) *Service { | |||||
| return &Service{ | return &Service{ | ||||
| swWorker: swWorker, | swWorker: swWorker, | ||||
| stgAgts: stgAgts, | stgAgts: stgAgts, | ||||
| @@ -1,81 +0,0 @@ | |||||
| package mq | |||||
| import ( | |||||
| "fmt" | |||||
| "time" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| mytask "gitlink.org.cn/cloudream/storage2/agent/internal/task" | |||||
| agtmq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/agent" | |||||
| ) | |||||
| func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { | |||||
| store, err := svc.stgAgts.GetShardStore(msg.StorageID) | |||||
| if err != nil { | |||||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) | |||||
| } | |||||
| infos, err := store.ListAll() | |||||
| if err != nil { | |||||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("listting file in shard store: %v", err)) | |||||
| } | |||||
| var fileHashes []cdssdk.FileHash | |||||
| for _, info := range infos { | |||||
| fileHashes = append(fileHashes, info.Hash) | |||||
| } | |||||
| return mq.ReplyOK(agtmq.NewCheckCacheResp(fileHashes)) | |||||
| } | |||||
| func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) { | |||||
| store, err := svc.stgAgts.GetShardStore(msg.StorageID) | |||||
| if err != nil { | |||||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of storage %v: %v", msg.StorageID, err)) | |||||
| } | |||||
| err = store.GC(msg.Avaiables) | |||||
| if err != nil { | |||||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err)) | |||||
| } | |||||
| return mq.ReplyOK(agtmq.RespCacheGC()) | |||||
| } | |||||
| func (svc *Service) StartCacheMovePackage(msg *agtmq.StartCacheMovePackage) (*agtmq.StartCacheMovePackageResp, *mq.CodeMessage) { | |||||
| tsk := svc.taskManager.StartNew(mytask.NewCacheMovePackage(msg.UserID, msg.PackageID, msg.StorageID)) | |||||
| return mq.ReplyOK(agtmq.NewStartCacheMovePackageResp(tsk.ID())) | |||||
| } | |||||
| func (svc *Service) WaitCacheMovePackage(msg *agtmq.WaitCacheMovePackage) (*agtmq.WaitCacheMovePackageResp, *mq.CodeMessage) { | |||||
| tsk := svc.taskManager.FindByID(msg.TaskID) | |||||
| if tsk == nil { | |||||
| return nil, mq.Failed(errorcode.TaskNotFound, "task not found") | |||||
| } | |||||
| if msg.WaitTimeoutMs == 0 { | |||||
| tsk.Wait() | |||||
| errMsg := "" | |||||
| if tsk.Error() != nil { | |||||
| errMsg = tsk.Error().Error() | |||||
| } | |||||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) | |||||
| } else { | |||||
| if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs) * time.Millisecond) { | |||||
| errMsg := "" | |||||
| if tsk.Error() != nil { | |||||
| errMsg = tsk.Error().Error() | |||||
| } | |||||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(true, errMsg)) | |||||
| } | |||||
| return mq.ReplyOK(agtmq.NewWaitCacheMovePackageResp(false, "")) | |||||
| } | |||||
| } | |||||
| @@ -1,21 +1,15 @@ | |||||
| package mq | package mq | ||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/storage2/agent/internal/task" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/uploader" | |||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/pool" | |||||
| ) | ) | ||||
| type Service struct { | type Service struct { | ||||
| taskManager *task.Manager | |||||
| stgAgts *agtpool.AgentPool | |||||
| uploader *uploader.Uploader | |||||
| stgAgts *pool.Pool | |||||
| } | } | ||||
| func NewService(taskMgr *task.Manager, stgAgts *agtpool.AgentPool, uplodaer *uploader.Uploader) *Service { | |||||
| func NewService(stgAgts *pool.Pool) *Service { | |||||
| return &Service{ | return &Service{ | ||||
| taskManager: taskMgr, | |||||
| stgAgts: stgAgts, | |||||
| uploader: uplodaer, | |||||
| stgAgts: stgAgts, | |||||
| } | } | ||||
| } | } | ||||
| @@ -1,5 +1,6 @@ | |||||
| package mq | package mq | ||||
| /* | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | "gitlink.org.cn/cloudream/common/consts/errorcode" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| @@ -55,3 +56,4 @@ func (svc *Service) StorageCreatePackage(msg *agtmq.StorageCreatePackage) (*agtm | |||||
| return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package)) | return mq.ReplyOK(agtmq.RespStorageCreatePackage(createResp.Package)) | ||||
| } | } | ||||
| */ | |||||
| @@ -1,5 +1,6 @@ | |||||
| package task | package task | ||||
| /* | |||||
| import ( | import ( | ||||
| "fmt" | "fmt" | ||||
| "time" | "time" | ||||
| @@ -88,3 +89,4 @@ func (t *CacheMovePackage) do(ctx TaskContext) error { | |||||
| return nil | return nil | ||||
| } | } | ||||
| */ | |||||
| @@ -1,5 +1,6 @@ | |||||
| package task | package task | ||||
| /* | |||||
| import ( | import ( | ||||
| "fmt" | "fmt" | ||||
| "path/filepath" | "path/filepath" | ||||
| @@ -146,3 +147,4 @@ func (t *CreatePackage) Execute(task *task.Task[TaskContext], ctx TaskContext, c | |||||
| RemovingDelay: time.Minute, | RemovingDelay: time.Minute, | ||||
| }) | }) | ||||
| } | } | ||||
| */ | |||||
| @@ -1,5 +1,6 @@ | |||||
| package task | package task | ||||
| /* | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock" | "gitlink.org.cn/cloudream/common/pkgs/distlock" | ||||
| "gitlink.org.cn/cloudream/common/pkgs/task" | "gitlink.org.cn/cloudream/common/pkgs/task" | ||||
| @@ -45,3 +46,4 @@ func NewManager(distlock *distlock.Service, connectivity *connectivity.Collector | |||||
| uploader: uploader, | uploader: uploader, | ||||
| }) | }) | ||||
| } | } | ||||
| */ | |||||
| @@ -1,5 +1,6 @@ | |||||
| package tickevent | package tickevent | ||||
| /* | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/utils/math2" | "gitlink.org.cn/cloudream/common/utils/math2" | ||||
| stgglb "gitlink.org.cn/cloudream/storage2/common/globals" | stgglb "gitlink.org.cn/cloudream/storage2/common/globals" | ||||
| @@ -59,3 +60,4 @@ func ReportHubStorageTransferStats(stgAgts *agtpool.AgentPool, evtPub *sysevent. | |||||
| }) | }) | ||||
| } | } | ||||
| } | } | ||||
| */ | |||||
| @@ -1,5 +1,6 @@ | |||||
| package tickevent | package tickevent | ||||
| /* | |||||
| import ( | import ( | ||||
| stgmod "gitlink.org.cn/cloudream/storage2/common/models" | stgmod "gitlink.org.cn/cloudream/storage2/common/models" | ||||
| "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | "gitlink.org.cn/cloudream/storage2/common/pkgs/storage/agtpool" | ||||
| @@ -21,3 +22,4 @@ func ReportStorageStats(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) | |||||
| }) | }) | ||||
| } | } | ||||
| } | } | ||||
| */ | |||||
| @@ -1,124 +0,0 @@ | |||||
| package agent | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| type CacheService interface { | |||||
| CheckCache(msg *CheckCache) (*CheckCacheResp, *mq.CodeMessage) | |||||
| CacheGC(msg *CacheGC) (*CacheGCResp, *mq.CodeMessage) | |||||
| StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage) | |||||
| WaitCacheMovePackage(msg *WaitCacheMovePackage) (*WaitCacheMovePackageResp, *mq.CodeMessage) | |||||
| } | |||||
| // 检查节点上的IPFS | |||||
| var _ = Register(Service.CheckCache) | |||||
| type CheckCache struct { | |||||
| mq.MessageBodyBase | |||||
| StorageID cdssdk.StorageID `json:"storageID"` | |||||
| } | |||||
| type CheckCacheResp struct { | |||||
| mq.MessageBodyBase | |||||
| FileHashes []cdssdk.FileHash `json:"fileHashes"` | |||||
| } | |||||
| func NewCheckCache(stgID cdssdk.StorageID) *CheckCache { | |||||
| return &CheckCache{StorageID: stgID} | |||||
| } | |||||
| func NewCheckCacheResp(fileHashes []cdssdk.FileHash) *CheckCacheResp { | |||||
| return &CheckCacheResp{ | |||||
| FileHashes: fileHashes, | |||||
| } | |||||
| } | |||||
| func (client *Client) CheckCache(msg *CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) { | |||||
| return mq.Request(Service.CheckCache, client.rabbitCli, msg, opts...) | |||||
| } | |||||
| // 清理Cache中不用的文件 | |||||
| var _ = Register(Service.CacheGC) | |||||
| type CacheGC struct { | |||||
| mq.MessageBodyBase | |||||
| StorageID cdssdk.StorageID `json:"storageID"` | |||||
| Avaiables []cdssdk.FileHash `json:"avaiables"` | |||||
| } | |||||
| type CacheGCResp struct { | |||||
| mq.MessageBodyBase | |||||
| } | |||||
| func ReqCacheGC(stgID cdssdk.StorageID, avaiables []cdssdk.FileHash) *CacheGC { | |||||
| return &CacheGC{ | |||||
| StorageID: stgID, | |||||
| Avaiables: avaiables, | |||||
| } | |||||
| } | |||||
| func RespCacheGC() *CacheGCResp { | |||||
| return &CacheGCResp{} | |||||
| } | |||||
| func (client *Client) CacheGC(msg *CacheGC, opts ...mq.RequestOption) (*CacheGCResp, error) { | |||||
| return mq.Request(Service.CacheGC, client.rabbitCli, msg, opts...) | |||||
| } | |||||
| // 将Package的缓存移动到这个节点 | |||||
| var _ = Register(Service.StartCacheMovePackage) | |||||
| type StartCacheMovePackage struct { | |||||
| mq.MessageBodyBase | |||||
| UserID cdssdk.UserID `json:"userID"` | |||||
| PackageID cdssdk.PackageID `json:"packageID"` | |||||
| StorageID cdssdk.StorageID `json:"storageID"` | |||||
| } | |||||
| type StartCacheMovePackageResp struct { | |||||
| mq.MessageBodyBase | |||||
| TaskID string `json:"taskID"` | |||||
| } | |||||
| func NewStartCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgID cdssdk.StorageID) *StartCacheMovePackage { | |||||
| return &StartCacheMovePackage{ | |||||
| UserID: userID, | |||||
| PackageID: packageID, | |||||
| StorageID: stgID, | |||||
| } | |||||
| } | |||||
| func NewStartCacheMovePackageResp(taskID string) *StartCacheMovePackageResp { | |||||
| return &StartCacheMovePackageResp{ | |||||
| TaskID: taskID, | |||||
| } | |||||
| } | |||||
| func (client *Client) StartCacheMovePackage(msg *StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) { | |||||
| return mq.Request(Service.StartCacheMovePackage, client.rabbitCli, msg, opts...) | |||||
| } | |||||
| // 将Package的缓存移动到这个节点 | |||||
| var _ = Register(Service.WaitCacheMovePackage) | |||||
| type WaitCacheMovePackage struct { | |||||
| mq.MessageBodyBase | |||||
| TaskID string `json:"taskID"` | |||||
| WaitTimeoutMs int64 `json:"waitTimeout"` | |||||
| } | |||||
| type WaitCacheMovePackageResp struct { | |||||
| mq.MessageBodyBase | |||||
| IsComplete bool `json:"isComplete"` | |||||
| Error string `json:"error"` | |||||
| } | |||||
| func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) *WaitCacheMovePackage { | |||||
| return &WaitCacheMovePackage{ | |||||
| TaskID: taskID, | |||||
| WaitTimeoutMs: waitTimeoutMs, | |||||
| } | |||||
| } | |||||
| func NewWaitCacheMovePackageResp(isComplete bool, err string) *WaitCacheMovePackageResp { | |||||
| return &WaitCacheMovePackageResp{ | |||||
| IsComplete: isComplete, | |||||
| Error: err, | |||||
| } | |||||
| } | |||||
| func (client *Client) WaitCacheMovePackage(msg *WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) { | |||||
| return mq.Request(Service.WaitCacheMovePackage, client.rabbitCli, msg, opts...) | |||||
| } | |||||
| @@ -10,7 +10,7 @@ import ( | |||||
| type Service interface { | type Service interface { | ||||
| // UserSpaceService | // UserSpaceService | ||||
| CacheService | |||||
| // CacheService | |||||
| AgentService | AgentService | ||||
| } | } | ||||
| @@ -6,11 +6,38 @@ import ( | |||||
| ) | ) | ||||
| type HubService interface { | type HubService interface { | ||||
| GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, *mq.CodeMessage) | |||||
| GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage) | GetHubs(msg *GetHubs) (*GetHubsResp, *mq.CodeMessage) | ||||
| GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage) | GetHubConnectivities(msg *GetHubConnectivities) (*GetHubConnectivitiesResp, *mq.CodeMessage) | ||||
| } | } | ||||
| var _ = Register(Service.GetHubConfig) | |||||
| type GetHubConfig struct { | |||||
| mq.MessageBodyBase | |||||
| HubID cortypes.HubID `json:"hubID"` | |||||
| } | |||||
| type GetHubConfigResp struct { | |||||
| mq.MessageBodyBase | |||||
| Hub cortypes.Hub `json:"hub"` | |||||
| } | |||||
| func ReqGetHubConfig(hubID cortypes.HubID) *GetHubConfig { | |||||
| return &GetHubConfig{ | |||||
| HubID: hubID, | |||||
| } | |||||
| } | |||||
| func RespGetHubConfig(hub cortypes.Hub) *GetHubConfigResp { | |||||
| return &GetHubConfigResp{ | |||||
| Hub: hub, | |||||
| } | |||||
| } | |||||
| func (client *Client) GetHubConfig(msg *GetHubConfig) (*GetHubConfigResp, error) { | |||||
| return mq.Request(Service.GetHubConfig, client.rabbitCli, msg) | |||||
| } | |||||
| // 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub | // 获取指定节点的信息。如果HubIDs为nil,则返回所有Hub | ||||
| var _ = Register(Service.GetHubs) | var _ = Register(Service.GetHubs) | ||||
| @@ -10,6 +10,18 @@ import ( | |||||
| cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" | cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types" | ||||
| ) | ) | ||||
| func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) { | |||||
| log := logger.WithField("HubID", msg.HubID) | |||||
| hub, err := svc.db.Hub().GetByID(svc.db.DefCtx(), msg.HubID) | |||||
| if err != nil { | |||||
| log.Warnf("getting hub: %v", err) | |||||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err)) | |||||
| } | |||||
| return mq.ReplyOK(coormq.RespGetHubConfig(hub)) | |||||
| } | |||||
| func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { | func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { | ||||
| var hubs []*cortypes.Hub | var hubs []*cortypes.Hub | ||||