diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index 197507b..833ebc9 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -3,6 +3,7 @@ package coordinator import ( "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" @@ -143,7 +144,7 @@ var _ = Register(Service.UpdateObjectInfos) type UpdateObjectInfos struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` - Updatings []cdssdk.UpdatingObject `json:"updatings"` + Updatings []cdsapi.UpdatingObject `json:"updatings"` } type UpdateObjectInfosResp struct { @@ -151,7 +152,7 @@ type UpdateObjectInfosResp struct { Successes []cdssdk.ObjectID `json:"successes"` } -func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdssdk.UpdatingObject) *UpdateObjectInfos { +func ReqUpdateObjectInfos(userID cdssdk.UserID, updatings []cdsapi.UpdatingObject) *UpdateObjectInfos { return &UpdateObjectInfos{ UserID: userID, Updatings: updatings, @@ -172,7 +173,7 @@ var _ = Register(Service.MoveObjects) type MoveObjects struct { mq.MessageBodyBase UserID cdssdk.UserID `json:"userID"` - Movings []cdssdk.MovingObject `json:"movings"` + Movings []cdsapi.MovingObject `json:"movings"` } type MoveObjectsResp struct { @@ -180,7 +181,7 @@ type MoveObjectsResp struct { Successes []cdssdk.ObjectID `json:"successes"` } -func ReqMoveObjects(userID cdssdk.UserID, movings []cdssdk.MovingObject) *MoveObjects { +func ReqMoveObjects(userID cdssdk.UserID, movings []cdsapi.MovingObject) *MoveObjects { return &MoveObjects{ UserID: userID, Movings: movings, diff --git a/common/pkgs/mq/coordinator/storage.go b/common/pkgs/mq/coordinator/storage.go index bc6bfac..a9585f8 100644 --- a/common/pkgs/mq/coordinator/storage.go +++ b/common/pkgs/mq/coordinator/storage.go @@ -10,6 +10,8 @@ import ( type StorageService interface { GetStorage(msg *GetStorage) (*GetStorageResp, *mq.CodeMessage) + GetStorageDetail(msg *GetStorageDetail) (*GetStorageDetailResp, *mq.CodeMessage) + GetStorageByName(msg *GetStorageByName) (*GetStorageByNameResp, *mq.CodeMessage) StoragePackageLoaded(msg *StoragePackageLoaded) (*StoragePackageLoadedResp, *mq.CodeMessage) @@ -43,6 +45,32 @@ func (client *Client) GetStorage(msg *GetStorage) (*GetStorageResp, error) { return mq.Request(Service.GetStorage, client.rabbitCli, msg) } +// 获取Storage的详细信息 +var _ = Register(Service.GetStorageDetail) + +type GetStorageDetail struct { + mq.MessageBodyBase + StorageID cdssdk.StorageID `json:"storageID"` +} +type GetStorageDetailResp struct { + mq.MessageBodyBase + Storage stgmod.StorageDetail `json:"storage"` +} + +func ReqGetStorageDetail(storageID cdssdk.StorageID) *GetStorageDetail { + return &GetStorageDetail{ + StorageID: storageID, + } +} +func RespGetStorageDetail(stg stgmod.StorageDetail) *GetStorageDetailResp { + return &GetStorageDetailResp{ + Storage: stg, + } +} +func (client *Client) GetStorageDetail(msg *GetStorageDetail) (*GetStorageDetailResp, error) { + return mq.Request(Service.GetStorageDetail, client.rabbitCli, msg) +} + var _ = Register(Service.GetStorageByName) type GetStorageByName struct { diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index adfd720..7f08173 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" @@ -29,7 +30,12 @@ func serve(configPath string) { logger.Fatalf("new db failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(mq.NewService(db), &config.Cfg().RabbitMQ) + db2, err := db2.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db2 failed, err: %s", err.Error()) + } + + coorSvr, err := coormq.NewServer(mq.NewService(db, db2), &config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } diff --git a/coordinator/internal/mq/node.go b/coordinator/internal/mq/node.go index fadea66..5642314 100644 --- a/coordinator/internal/mq/node.go +++ b/coordinator/internal/mq/node.go @@ -3,6 +3,7 @@ package mq import ( "database/sql" "fmt" + "github.com/jmoiron/sqlx" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -27,7 +28,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co if msg.NodeIDs == nil { var err error - nodes, err = svc.db2.Node().GetAllNodes() + nodes, err = svc.db2.Node().GetAllNodes(svc.db2.DefCtx()) if err != nil { logger.Warnf("getting all nodes: %s", err.Error()) return nil, mq.Failed(errorcode.OperationFailed, "get all node failed") @@ -36,7 +37,7 @@ func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.Co } else { // 可以不用事务 for _, id := range msg.NodeIDs { - node, err := svc.db2.Node().GetByID(id) + node, err := svc.db2.Node().GetByID(svc.db2.DefCtx(), id) if err != nil { logger.WithField("NodeID", id). Warnf("query node failed, err: %s", err.Error()) diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 4890bb3..cceaabc 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/sort2" stgmod "gitlink.org.cn/cloudream/storage/common/models" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" @@ -167,7 +168,7 @@ func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) ( func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) { var sucs []cdssdk.ObjectID err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdssdk.UpdatingObject) int { + msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int { return sort2.Cmp(o1.ObjectID, o2.ObjectID) }) @@ -185,7 +186,7 @@ func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.Up oldObjIDs[i] = obj.ObjectID } - avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdssdk.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID }) + avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID }) if len(notExistsObjs) > 0 { // TODO 部分对象已经不存在 } @@ -237,7 +238,7 @@ func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cd func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) { var sucs []cdssdk.ObjectID err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { - msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdssdk.MovingObject) int { + msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int { return sort2.Cmp(o1.ObjectID, o2.ObjectID) }) @@ -255,7 +256,7 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes oldObjIDs[i] = obj.ObjectID } - avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdssdk.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) + avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID }) if len(notExistsObjs) > 0 { // TODO 部分对象已经不存在 } diff --git a/coordinator/internal/mq/storage.go b/coordinator/internal/mq/storage.go index 4bf759d..283d383 100644 --- a/coordinator/internal/mq/storage.go +++ b/coordinator/internal/mq/storage.go @@ -9,6 +9,8 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -22,6 +24,35 @@ func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, return mq.ReplyOK(coormq.RespGetStorage(stg)) } +func (svc *Service) GetStorageDetail(msg *coormq.GetStorageDetail) (*coormq.GetStorageDetailResp, *mq.CodeMessage) { + var ret stgmod.StorageDetail + + svc.db2.DoTx(func(tx db2.SQLContext) error { + stg, err := svc.db2.Storage().GetByID(tx, msg.StorageID) + if err != nil { + return fmt.Errorf("getting storage: %w", err) + } + + shard, err := svc.db2.ShardStorage().GetByStorageID(tx, msg.StorageID) + if err != nil { + return fmt.Errorf("getting shard storage: %w", err) + } + + shared, err := svc.db2.SharedStorage().GetByStorageID(tx, msg.StorageID) + if err != nil { + return fmt.Errorf("getting shared storage: %w", err) + } + + ret.Storage = stg + ret.Shard = shard + ret.Shared = shared + + return nil + }) + + return mq.ReplyOK(coormq.RespGetStorageDetail(ret)) +} + func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) { stg, err := svc.db.Storage().GetUserStorageByName(svc.db.SQLCtx(), msg.UserID, msg.Name) if err != nil { diff --git a/coordinator/main.go b/coordinator/main.go index d7a4798..212b22e 100644 --- a/coordinator/main.go +++ b/coordinator/main.go @@ -1,66 +1,9 @@ package main import ( - "fmt" - "os" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - mydb "gitlink.org.cn/cloudream/storage/common/pkgs/db" - mydb2 "gitlink.org.cn/cloudream/storage/common/pkgs/db2" - coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" - "gitlink.org.cn/cloudream/storage/coordinator/internal/config" - "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" + "gitlink.org.cn/cloudream/storage/coordinator/internal/cmd" ) func main() { - err := config.Init() - if err != nil { - fmt.Printf("init config failed, err: %s", err.Error()) - os.Exit(1) - } - - err = logger.Init(&config.Cfg().Logger) - if err != nil { - fmt.Printf("init logger failed, err: %s", err.Error()) - os.Exit(1) - } - - db, err := mydb.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db failed, err: %s", err.Error()) - } - - db2, err := mydb2.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db failed, err: %s", err.Error()) - } - - coorSvr, err := coormq.NewServer(mq.NewService(db, db2), &config.Cfg().RabbitMQ) - if err != nil { - logger.Fatalf("new coordinator server failed, err: %s", err.Error()) - } - - coorSvr.OnError(func(err error) { - logger.Warnf("coordinator server err: %s", err.Error()) - }) - - // 启动服务 - go serveCoorServer(coorSvr) - - forever := make(chan bool) - <-forever -} - -func serveCoorServer(server *coormq.Server) { - logger.Info("start serving command server") - - err := server.Serve() - if err != nil { - logger.Errorf("command server stopped with error: %s", err.Error()) - } - - logger.Info("command server stopped") - - // TODO 仅简单结束了程序 - os.Exit(1) + cmd.RootCmd.Execute() }