| @@ -4,13 +4,13 @@ import ( | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| c "gitlink.org.cn/cloudream/common/utils/config" | |||
| db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config" | |||
| racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config" | |||
| stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" | |||
| ) | |||
| type Config struct { | |||
| Logger log.Config `json:"logger"` | |||
| DB db.Config `json:"db"` | |||
| RabbitMQ racfg.Config `json:"rabbitMQ"` | |||
| RabbitMQ stgmq.Config `json:"rabbitMQ"` | |||
| } | |||
| var cfg Config | |||
| @@ -1,14 +1,14 @@ | |||
| package services | |||
| import ( | |||
| coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (service *Service) TempCacheReport(msg *coormsg.TempCacheReport) { | |||
| service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) | |||
| func (service *Service) TempCacheReport(msg *coormq.TempCacheReport) { | |||
| //service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) | |||
| } | |||
| func (service *Service) AgentStatusReport(msg *coormsg.AgentStatusReport) { | |||
| func (service *Service) AgentStatusReport(msg *coormq.AgentStatusReport) { | |||
| //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus | |||
| //根据command中的Ip,插入节点延迟表 | |||
| @@ -5,10 +5,10 @@ import ( | |||
| "github.com/jmoiron/sqlx" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | |||
| coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { | |||
| @@ -16,32 +16,32 @@ func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { | |||
| panic("not implement yet") | |||
| } | |||
| func (svc *Service) GetUserBuckets(msg *coormsg.GetUserBuckets) (*coormsg.GetUserBucketsResp, *mq.CodeMessage) { | |||
| func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) { | |||
| buckets, err := svc.db.Bucket().GetUserBuckets(svc.db.SQLCtx(), msg.UserID) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.UserID). | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warnf("get user buckets failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") | |||
| return mq.ReplyFailed[coormq.GetUserBucketsResp](errorcode.OperationFailed, "get all buckets failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewGetUserBucketsResp(buckets)) | |||
| return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets)) | |||
| } | |||
| func (svc *Service) GetBucketObjects(msg *coormsg.GetBucketObjects) (*coormsg.GetBucketObjectsResp, *mq.CodeMessage) { | |||
| objects, err := svc.db.Object().GetBucketObjects(svc.db.SQLCtx(), msg.UserID, msg.BucketID) | |||
| func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) { | |||
| packages, err := svc.db.Package().GetBucketPackages(svc.db.SQLCtx(), msg.UserID, msg.BucketID) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.UserID). | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("BucketID", msg.BucketID). | |||
| Warnf("get bucket objects failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.GetBucketObjectsResp](errorcode.OperationFailed, "get bucket objects failed") | |||
| Warnf("get bucket packages failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.GetBucketPackagesResp](errorcode.OperationFailed, "get bucket packages failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewGetBucketObjectsResp(objects)) | |||
| return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages)) | |||
| } | |||
| func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBucketResp, *mq.CodeMessage) { | |||
| func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) { | |||
| var bucketID int64 | |||
| var err error | |||
| svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| @@ -50,25 +50,25 @@ func (svc *Service) CreateBucket(msg *coormsg.CreateBucket) (*coormsg.CreateBuck | |||
| return err | |||
| }) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.UserID). | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("BucketName", msg.BucketName). | |||
| Warnf("create bucket failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") | |||
| return mq.ReplyFailed[coormq.CreateBucketResp](errorcode.OperationFailed, "create bucket failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewCreateBucketResp(bucketID)) | |||
| return mq.ReplyOK(coormq.NewCreateBucketResp(bucketID)) | |||
| } | |||
| func (svc *Service) DeleteBucket(msg *coormsg.DeleteBucket) (*coormsg.DeleteBucketResp, *mq.CodeMessage) { | |||
| func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| return svc.db.Bucket().Delete(tx, msg.BucketID) | |||
| }) | |||
| if err != nil { | |||
| log.WithField("UserID", msg.UserID). | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("BucketID", msg.BucketID). | |||
| Warnf("delete bucket failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") | |||
| return mq.ReplyFailed[coormq.DeleteBucketResp](errorcode.OperationFailed, "delete bucket failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewDeleteBucketResp()) | |||
| return mq.ReplyOK(coormq.NewDeleteBucketResp()) | |||
| } | |||
| @@ -1,100 +0,0 @@ | |||
| package services | |||
| import ( | |||
| "database/sql" | |||
| "errors" | |||
| "github.com/jmoiron/sqlx" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" | |||
| coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" | |||
| ) | |||
| func (svc *Service) PreUploadEcObject(msg *coormsg.PreUploadEcObject) (*coormsg.PreUploadEcResp, *mq.CodeMessage) { | |||
| // 判断同名对象是否存在。等到UploadRepObject时再判断一次。 | |||
| // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 | |||
| isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID) | |||
| if err != nil { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| Warnf("check bucket available failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "check bucket available failed") | |||
| } | |||
| if !isBucketAvai { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| Warnf("bucket is not available to user") | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "bucket is not available to user") | |||
| } | |||
| _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName) | |||
| if err == nil { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("object with given Name and BucketID already exists") | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "object with given Name and BucketID already exists") | |||
| } | |||
| if !errors.Is(err, sql.ErrNoRows) { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("get object by name failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "get object by name failed") | |||
| } | |||
| //查询用户可用的节点IP | |||
| nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "query user nodes failed") | |||
| } | |||
| // 查询客户端所属节点 | |||
| foundBelongNode := true | |||
| belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) | |||
| if err == sql.ErrNoRows { | |||
| foundBelongNode = false | |||
| } else if err != nil { | |||
| logger.WithField("ClientExternalIP", msg.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "query client belong node failed") | |||
| } | |||
| var respNodes []mymq.RespNode | |||
| for _, node := range nodes { | |||
| respNodes = append(respNodes, mymq.NewRespNode( | |||
| node.NodeID, | |||
| node.ExternalIP, | |||
| node.LocalIP, | |||
| // LocationID 相同则认为是在同一个地域 | |||
| foundBelongNode && belongNode.LocationID == node.LocationID, | |||
| )) | |||
| } | |||
| //查询纠删码参数 | |||
| ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.EcName) | |||
| if err != nil { | |||
| logger.WithField("Ec", msg.EcName). | |||
| Warnf("check ec type failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadEcResp](errorcode.OperationFailed, "check bucket available failed") | |||
| } | |||
| ecc := mymq.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) | |||
| return mq.ReplyOK(coormsg.NewPreUploadEcResp(respNodes, ecc)) | |||
| } | |||
| func (svc *Service) CreateEcObject(msg *coormsg.CreateEcObject) (*coormsg.CreateObjectResp, *mq.CodeMessage) { | |||
| var objID int64 | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| var err error | |||
| objID, err = svc.db.Object().CreateEcObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.UserID, msg.NodeIDs, msg.Hashes, msg.EcName, msg.DirName) | |||
| return err | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("BucketName", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("create rep object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewCreateObjectResp(objID)) | |||
| } | |||
| @@ -0,0 +1,30 @@ | |||
| package services | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (svc *Service) FindClientLocation(msg *coormq.FindClientLocation) (*coormq.FindClientLocationResp, *mq.CodeMessage) { | |||
| location, err := svc.db.Location().FindLocationByExternalIP(svc.db.SQLCtx(), msg.IP) | |||
| if err != nil { | |||
| logger.WithField("IP", msg.IP). | |||
| Warnf("query client location failed, err: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query client location failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewFindClientLocationResp(location)) | |||
| } | |||
| func (svc *Service) GetECConfig(msg *coormq.GetECConfig) (*coormq.GetECConfigResp, *mq.CodeMessage) { | |||
| ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), msg.ECName) | |||
| if err != nil { | |||
| logger.WithField("ECName", msg.ECName). | |||
| Warnf("query ec failed, err: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query ec failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetECConfigResp(ec)) | |||
| } | |||
| @@ -0,0 +1,36 @@ | |||
| package services | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (svc *Service) GetUserNodes(msg *coormq.GetUserNodes) (*coormq.GetUserNodesResp, *mq.CodeMessage) { | |||
| nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query user nodes failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetUserNodesResp(nodes)) | |||
| } | |||
| func (svc *Service) GetNodes(msg *coormq.GetNodes) (*coormq.GetNodesResp, *mq.CodeMessage) { | |||
| var nodes []model.Node | |||
| for _, id := range msg.NodeIDs { | |||
| node, err := svc.db.Node().GetByID(svc.db.SQLCtx(), id) | |||
| if err != nil { | |||
| logger.WithField("NodeID", id). | |||
| Warnf("query node failed, err: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query node failed") | |||
| } | |||
| nodes = append(nodes, node) | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetNodesResp(nodes)) | |||
| } | |||
| @@ -1,358 +1,32 @@ | |||
| package services | |||
| import ( | |||
| "database/sql" | |||
| "errors" | |||
| "github.com/jmoiron/sqlx" | |||
| "github.com/samber/lo" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | |||
| mymq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" | |||
| coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" | |||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (svc *Service) GetObjectsByDirName(msg *coormsg.GetObjectsByDirName) (*coormsg.GetObjectsResp, *mq.CodeMessage) { | |||
| //查询dirName下所有文件 | |||
| objects, err := svc.db.Object().GetByDirName(svc.db.SQLCtx(), msg.DirName) | |||
| if err != nil { | |||
| logger.WithField("DirName", msg.DirName). | |||
| Warnf("query dirname failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.GetObjectsResp](errorcode.OperationFailed, "get objects failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewGetObjectsResp(objects)) | |||
| } | |||
| func (svc *Service) PreDownloadObject(msg *coormsg.PreDownloadObject) (*coormsg.PreDownloadObjectResp, *mq.CodeMessage) { | |||
| // 查询文件对象 | |||
| object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("query Object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Object failed") | |||
| } | |||
| // 查询客户端所属节点 | |||
| foundBelongNode := true | |||
| belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) | |||
| if err == sql.ErrNoRows { | |||
| foundBelongNode = false | |||
| } else if err != nil { | |||
| logger.WithField("ClientExternalIP", msg.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query client belong node failed") | |||
| } | |||
| logger.Debugf("client address %s is at location %d", msg.ClientExternalIP, belongNode.LocationID) | |||
| //-若redundancy是rep,查询对象副本表, 获得FileHash | |||
| if object.Redundancy == models.RedundancyRep { | |||
| objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", object.ObjectID). | |||
| Warnf("get ObjectRep failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query ObjectRep failed") | |||
| } | |||
| // 注:由于采用了IPFS存储,因此每个备份文件的FileHash都是一样的 | |||
| nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objectRep.FileHash) | |||
| if err != nil { | |||
| logger.WithField("FileHash", objectRep.FileHash). | |||
| Warnf("query Cache failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed") | |||
| } | |||
| var respNodes []mymq.RespNode | |||
| for _, node := range nodes { | |||
| respNodes = append(respNodes, mymq.NewRespNode( | |||
| node.NodeID, | |||
| node.ExternalIP, | |||
| node.LocalIP, | |||
| // LocationID 相同则认为是在同一个地域 | |||
| foundBelongNode && belongNode.LocationID == node.LocationID, | |||
| )) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreDownloadObjectResp( | |||
| object.FileSize, | |||
| mymq.NewRespRepRedundancyData(objectRep.FileHash, respNodes), | |||
| )) | |||
| } else { | |||
| // TODO 参考上面进行重写 | |||
| ecName := object.Redundancy | |||
| blocks, err := svc.db.QueryObjectBlock(object.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", object.ObjectID). | |||
| Warnf("query Blocks failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Blocks failed") | |||
| } | |||
| logger.Debugf(blocks[4].BlockHash) | |||
| //查询纠删码参数 | |||
| ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName) | |||
| ecc := mymq.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) | |||
| //查询每个编码块存放的所有节点 | |||
| respNodes := make([][]mymq.RespNode, len(blocks)) | |||
| for i := 0; i < len(blocks); i++ { | |||
| nodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, blocks[i].BlockHash) | |||
| if err != nil { | |||
| logger.WithField("FileHash", blocks[i].BlockHash). | |||
| Warnf("query Cache failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreDownloadObjectResp](errorcode.OperationFailed, "query Cache failed") | |||
| } | |||
| var nd []mymq.RespNode | |||
| for _, node := range nodes { | |||
| nd = append(nd, mymq.NewRespNode( | |||
| node.NodeID, | |||
| node.ExternalIP, | |||
| node.LocalIP, | |||
| // LocationID 相同则认为是在同一个地域 | |||
| foundBelongNode && belongNode.LocationID == node.LocationID, | |||
| )) | |||
| } | |||
| respNodes[i] = nd | |||
| logger.Debugf("##%d\n", i) | |||
| } | |||
| var blockss []mymq.RespObjectBlock | |||
| for i := 0; i < len(blocks); i++ { | |||
| blockss = append(blockss, mymq.NewRespObjectBlock( | |||
| blocks[i].InnerID, | |||
| blocks[i].BlockHash, | |||
| )) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreDownloadObjectResp( | |||
| object.FileSize, | |||
| mymq.NewRespEcRedundancyData(ecc, blockss, respNodes), | |||
| )) | |||
| } | |||
| } | |||
| func (svc *Service) PreUploadRepObject(msg *coormsg.PreUploadRepObject) (*coormsg.PreUploadResp, *mq.CodeMessage) { | |||
| // 判断同名对象是否存在。等到UploadRepObject时再判断一次。 | |||
| // 此次的判断只作为参考,具体是否成功还是看UploadRepObject的结果 | |||
| isBucketAvai, err := svc.db.Bucket().IsAvailable(svc.db.SQLCtx(), msg.BucketID, msg.UserID) | |||
| if err != nil { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| Warnf("check bucket available failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "check bucket available failed") | |||
| } | |||
| if !isBucketAvai { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| Warnf("bucket is not available to user") | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "bucket is not available to user") | |||
| } | |||
| _, err = svc.db.Object().GetByName(svc.db.SQLCtx(), msg.BucketID, msg.ObjectName) | |||
| if err == nil { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("object with given Name and BucketID already exists") | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "object with given Name and BucketID already exists") | |||
| } | |||
| if !errors.Is(err, sql.ErrNoRows) { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("get object by name failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "get object by name failed") | |||
| } | |||
| //查询用户可用的节点IP | |||
| nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) | |||
| func (svc *Service) GetPackageObjectRepData(msg *coormq.GetPackageObjectRepData) (*coormq.GetPackageObjectRepDataResp, *mq.CodeMessage) { | |||
| data, err := svc.db.ObjectRep().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query user nodes failed") | |||
| } | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("query object rep and node id in package: %s", err.Error()) | |||
| // 查询客户端所属节点 | |||
| foundBelongNode := true | |||
| belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) | |||
| if err == sql.ErrNoRows { | |||
| foundBelongNode = false | |||
| } else if err != nil { | |||
| logger.WithField("ClientExternalIP", msg.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUploadResp](errorcode.OperationFailed, "query client belong node failed") | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query object rep and node id in package failed") | |||
| } | |||
| var respNodes []mymq.RespNode | |||
| for _, node := range nodes { | |||
| respNodes = append(respNodes, mymq.NewRespNode( | |||
| node.NodeID, | |||
| node.ExternalIP, | |||
| node.LocalIP, | |||
| // LocationID 相同则认为是在同一个地域 | |||
| foundBelongNode && belongNode.LocationID == node.LocationID, | |||
| )) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreUploadResp(respNodes)) | |||
| return mq.ReplyOK(coormq.NewGetPackageObjectRepDataResp(data)) | |||
| } | |||
| func (svc *Service) CreateRepObject(msg *coormsg.CreateRepObject) (*coormsg.CreateObjectResp, *mq.CodeMessage) { | |||
| var objID int64 | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| var err error | |||
| objID, err = svc.db.Object().CreateRepObject(tx, msg.BucketID, msg.ObjectName, msg.FileSize, msg.RepCount, msg.NodeIDs, msg.FileHash, msg.DirName) | |||
| return err | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("BucketName", msg.BucketID). | |||
| WithField("ObjectName", msg.ObjectName). | |||
| Warnf("create rep object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.CreateObjectResp](errorcode.OperationFailed, "create rep object failed") | |||
| } | |||
| // 紧急任务 | |||
| err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) | |||
| func (svc *Service) GetPackageObjectECData(msg *coormq.GetPackageObjectECData) (*coormq.GetPackageObjectECDataResp, *mq.CodeMessage) { | |||
| data, err := svc.db.ObjectBlock().GetWithNodeIDInPackage(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewCreateObjectResp(objID)) | |||
| } | |||
| func (svc *Service) PreUpdateRepObject(msg *coormsg.PreUpdateRepObject) (*coormsg.PreUpdateRepObjectResp, *mq.CodeMessage) { | |||
| // TODO 检查用户是否有Object的权限 | |||
| // 获取对象信息 | |||
| obj, err := svc.db.Object().GetByID(svc.db.SQLCtx(), msg.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("get object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object failed") | |||
| } | |||
| if obj.Redundancy != models.RedundancyRep { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("this object is not a rep object") | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "this object is not a rep object") | |||
| } | |||
| // 获取对象Rep信息 | |||
| objRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), msg.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("get object rep failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "get object rep failed") | |||
| } | |||
| //查询用户可用的节点IP | |||
| nodes, err := svc.db.Node().GetUserNodes(svc.db.SQLCtx(), msg.UserID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warnf("query user nodes failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query user nodes failed") | |||
| } | |||
| // 查询客户端所属节点 | |||
| foundBelongNode := true | |||
| belongNode, err := svc.db.Node().GetByExternalIP(svc.db.SQLCtx(), msg.ClientExternalIP) | |||
| if err == sql.ErrNoRows { | |||
| foundBelongNode = false | |||
| } else if err != nil { | |||
| logger.WithField("ClientExternalIP", msg.ClientExternalIP). | |||
| Warnf("query client belong node failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "query client belong node failed") | |||
| } | |||
| // 查询保存了旧文件的节点信息 | |||
| cachingNodes, err := svc.db.Cache().FindCachingFileUserNodes(svc.db.SQLCtx(), msg.UserID, objRep.FileHash) | |||
| if err != nil { | |||
| logger.Warnf("find caching file user nodes failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreUpdateRepObjectResp](errorcode.OperationFailed, "find caching file user nodes failed") | |||
| } | |||
| var retNodes []coormsg.PreUpdateRepObjectRespNode | |||
| for _, node := range nodes { | |||
| retNodes = append(retNodes, coormsg.NewPreUpdateRepObjectRespNode( | |||
| node.NodeID, | |||
| node.ExternalIP, | |||
| node.LocalIP, | |||
| // LocationID 相同则认为是在同一个地域 | |||
| foundBelongNode && belongNode.LocationID == node.LocationID, | |||
| // 此节点存储了对象旧文件 | |||
| lo.ContainsBy(cachingNodes, func(n model.Node) bool { return n.NodeID == node.NodeID }), | |||
| )) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreUpdateRepObjectResp(retNodes)) | |||
| } | |||
| func (svc *Service) UpdateRepObject(msg *coormsg.UpdateRepObject) (*coormsg.UpdateRepObjectResp, *mq.CodeMessage) { | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| return svc.db.Object().UpdateRepObject(tx, msg.ObjectID, msg.FileSize, msg.NodeIDs, msg.FileHash) | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("update rep object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.UpdateRepObjectResp](errorcode.OperationFailed, "update rep object failed") | |||
| } | |||
| // 紧急任务 | |||
| err = svc.scanner.PostEvent(scevt.NewCheckRepCount([]string{msg.FileHash}), true, true) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect updating, err: %s", err.Error()) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewUpdateRepObjectResp()) | |||
| } | |||
| func (svc *Service) DeleteObject(msg *coormsg.DeleteObject) (*coormsg.DeleteObjectResp, *mq.CodeMessage) { | |||
| isAva, err := svc.db.Object().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("ObjectID", msg.ObjectID). | |||
| Warnf("check object available failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "check object available failed") | |||
| } | |||
| if !isAva { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("ObjectID", msg.ObjectID). | |||
| Warnf("object is not available to the user") | |||
| return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "object is not available to the user") | |||
| } | |||
| err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| return svc.db.Object().SoftDelete(tx, msg.ObjectID) | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("ObjectID", msg.ObjectID). | |||
| Warnf("set object deleted failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.DeleteObjectResp](errorcode.OperationFailed, "set object deleted failed") | |||
| } | |||
| stgs, err := svc.db.StorageObject().FindObjectStorages(svc.db.SQLCtx(), msg.ObjectID) | |||
| if err != nil { | |||
| logger.Warnf("find object storages failed, but this will not affect the deleting, err: %s", err.Error()) | |||
| return mq.ReplyOK(coormsg.NewDeleteObjectResp()) | |||
| } | |||
| // 不追求及时、准确 | |||
| if len(stgs) == 0 { | |||
| // 如果没有被引用,直接投递CheckObject的任务 | |||
| err := svc.scanner.PostEvent(scevt.NewCheckObject([]int64{msg.ObjectID}), false, false) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| logger.Debugf("post check object event") | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("query object ec and node id in package: %s", err.Error()) | |||
| } else { | |||
| // 有引用则让Agent去检查StorageObject | |||
| for _, stg := range stgs { | |||
| err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.ObjectID}), false, false) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| } | |||
| logger.Debugf("post agent check storage event") | |||
| return nil, mq.Failed(errorcode.OperationFailed, "query object ec and node id in package failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewDeleteObjectResp()) | |||
| return mq.ReplyOK(coormq.NewGetPackageObjectECDataResp(data)) | |||
| } | |||
| @@ -0,0 +1,189 @@ | |||
| package services | |||
| import ( | |||
| "database/sql" | |||
| "fmt" | |||
| "github.com/jmoiron/sqlx" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||
| ) | |||
| func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) { | |||
| pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get package: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get package failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetPackageResp(pkg)) | |||
| } | |||
| func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) { | |||
| // TODO 检查用户是否有权限 | |||
| objs, err := svc.db.Object().GetPackageObjects(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get package objects: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewGetPackageObjectsResp(objs)) | |||
| } | |||
| func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) { | |||
| var pkgID int64 | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| var err error | |||
| pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name, msg.Redundancy) | |||
| return err | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("BucketID", msg.BucketID). | |||
| WithField("Name", msg.Name). | |||
| Warnf("creating package: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "creating package failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID)) | |||
| } | |||
| func (svc *Service) UpdateRepPackage(msg *coormq.UpdateRepPackage) (*coormq.UpdateRepPackageResp, *mq.CodeMessage) { | |||
| _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get package: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get package failed") | |||
| } | |||
| err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| // 先执行删除操作 | |||
| if len(msg.Deletes) > 0 { | |||
| if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { | |||
| return fmt.Errorf("deleting objects: %w", err) | |||
| } | |||
| } | |||
| // 再执行添加操作 | |||
| if len(msg.Adds) > 0 { | |||
| if _, err := svc.db.Object().BatchAddRep(tx, msg.PackageID, msg.Adds); err != nil { | |||
| return fmt.Errorf("adding objects: %w", err) | |||
| } | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| logger.Warn(err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "update rep package failed") | |||
| } | |||
| // 紧急任务 | |||
| var affectFileHashes []string | |||
| for _, add := range msg.Adds { | |||
| affectFileHashes = append(affectFileHashes, add.FileHash) | |||
| } | |||
| err = svc.scanner.PostEvent(scevt.NewCheckRepCount(affectFileHashes), true, true) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect creating, err: %s", err.Error()) | |||
| } | |||
| return mq.ReplyOK(coormq.NewUpdateRepPackageResp()) | |||
| } | |||
| func (svc *Service) UpdateECPackage(msg *coormq.UpdateECPackage) (*coormq.UpdateECPackageResp, *mq.CodeMessage) { | |||
| _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("PackageID", msg.PackageID). | |||
| Warnf("get package: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get package failed") | |||
| } | |||
| err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| // 先执行删除操作 | |||
| if len(msg.Deletes) > 0 { | |||
| if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil { | |||
| return fmt.Errorf("deleting objects: %w", err) | |||
| } | |||
| } | |||
| // 再执行添加操作 | |||
| if len(msg.Adds) > 0 { | |||
| if _, err := svc.db.Object().BatchAddEC(tx, msg.PackageID, msg.Adds); err != nil { | |||
| return fmt.Errorf("adding objects: %w", err) | |||
| } | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| logger.Warn(err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "update ec package failed") | |||
| } | |||
| return mq.ReplyOK(coormq.NewUpdateECPackageResp()) | |||
| } | |||
| func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) { | |||
| isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("PackageID", msg.PackageID). | |||
| Warnf("check package available failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "check package available failed") | |||
| } | |||
| if !isAva { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("PackageID", msg.PackageID). | |||
| Warnf("package is not available to the user") | |||
| return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "package is not available to the user") | |||
| } | |||
| err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| return svc.db.Package().SoftDelete(tx, msg.PackageID) | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("PackageID", msg.PackageID). | |||
| Warnf("set package deleted failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormq.DeletePackageResp](errorcode.OperationFailed, "set package deleted failed") | |||
| } | |||
| stgs, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID) | |||
| if err != nil { | |||
| logger.Warnf("find package storages failed, but this will not affect the deleting, err: %s", err.Error()) | |||
| return mq.ReplyOK(coormq.NewDeletePackageResp()) | |||
| } | |||
| // 不追求及时、准确 | |||
| if len(stgs) == 0 { | |||
| // 如果没有被引用,直接投递CheckPackage的任务 | |||
| err := svc.scanner.PostEvent(scevt.NewCheckPackage([]int64{msg.PackageID}), false, false) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| logger.Debugf("post check package event") | |||
| } else { | |||
| // 有引用则让Agent去检查StoragePackage | |||
| for _, stg := range stgs { | |||
| err := svc.scanner.PostEvent(scevt.NewAgentCheckStorage(stg.StorageID, []int64{msg.PackageID}), false, false) | |||
| if err != nil { | |||
| logger.Warnf("post event to scanner failed, but this will not affect deleting, err: %s", err.Error()) | |||
| } | |||
| } | |||
| logger.Debugf("post agent check storage event") | |||
| } | |||
| return mq.ReplyOK(coormq.NewDeletePackageResp()) | |||
| } | |||
| @@ -2,15 +2,15 @@ package services | |||
| import ( | |||
| mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" | |||
| sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" | |||
| scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" | |||
| ) | |||
| type Service struct { | |||
| db *mydb.DB | |||
| scanner *sccli.Client | |||
| scanner *scmq.Client | |||
| } | |||
| func NewService(db *mydb.DB, scanner *sccli.Client) *Service { | |||
| func NewService(db *mydb.DB, scanner *scmq.Client) *Service { | |||
| return &Service{ | |||
| db: db, | |||
| scanner: scanner, | |||
| @@ -5,100 +5,34 @@ import ( | |||
| "github.com/jmoiron/sqlx" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| mymodels "gitlink.org.cn/cloudream/storage-common/models" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| ) | |||
| func (svc *Service) GetStorageInfo(msg *coormsg.GetStorageInfo) (*coormsg.GetStorageInfoResp, *mq.CodeMessage) { | |||
| func (svc *Service) GetStorageInfo(msg *coormq.GetStorageInfo) (*coormq.GetStorageInfoResp, *mq.CodeMessage) { | |||
| stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) | |||
| if err != nil { | |||
| logger.Warnf("getting user storage: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) | |||
| return mq.ReplyOK(coormq.NewGetStorageInfoResp(stg.StorageID, stg.Name, stg.NodeID, stg.Directory, stg.State)) | |||
| } | |||
| func (svc *Service) PreMoveObjectToStorage(msg *coormsg.PreMoveObjectToStorage) (*coormsg.PreMoveObjectToStorageResp, *mq.CodeMessage) { | |||
| // 查询用户关联的存储服务 | |||
| stg, err := svc.db.Storage().GetUserStorage(svc.db.SQLCtx(), msg.UserID, msg.StorageID) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("StorageID", msg.StorageID). | |||
| Warnf("get user Storage failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Storage failed") | |||
| } | |||
| // 查询文件对象 | |||
| object, err := svc.db.Object().GetUserObject(svc.db.SQLCtx(), msg.UserID, msg.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", msg.ObjectID). | |||
| Warnf("get user Object failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get user Object failed") | |||
| } | |||
| //-若redundancy是rep,查询对象副本表, 获得FileHash | |||
| if object.Redundancy == models.RedundancyRep { | |||
| objectRep, err := svc.db.ObjectRep().GetByID(svc.db.SQLCtx(), object.ObjectID) | |||
| if err != nil { | |||
| logger.Warnf("get ObjectRep failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "get ObjectRep failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody( | |||
| stg.NodeID, | |||
| stg.Directory, | |||
| object, | |||
| mymodels.NewRedundancyRepData(objectRep.FileHash), | |||
| )) | |||
| } else { | |||
| // TODO 以EC_开头的Redundancy才是EC策略 | |||
| ecName := object.Redundancy | |||
| blocks, err := svc.db.QueryObjectBlock(object.ObjectID) | |||
| if err != nil { | |||
| logger.WithField("ObjectID", object.ObjectID). | |||
| Warnf("query Blocks failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.PreMoveObjectToStorageResp](errorcode.OperationFailed, "query Blocks failed") | |||
| } | |||
| //查询纠删码参数 | |||
| ec, err := svc.db.Ec().GetEc(svc.db.SQLCtx(), ecName) | |||
| // TODO zkx 异常处理 | |||
| ecc := mymodels.NewEc(ec.EcID, ec.Name, ec.EcK, ec.EcN) | |||
| blockss := make([]mymodels.ObjectBlock, len(blocks)) | |||
| for i := 0; i < len(blocks); i++ { | |||
| blockss[i] = mymodels.NewObjectBlock( | |||
| blocks[i].InnerID, | |||
| blocks[i].BlockHash, | |||
| ) | |||
| } | |||
| return mq.ReplyOK(coormsg.NewPreMoveObjectToStorageRespBody( | |||
| stg.NodeID, | |||
| stg.Directory, | |||
| object, | |||
| mymodels.NewRedundancyEcData(ecc, blockss), | |||
| )) | |||
| } | |||
| } | |||
| func (svc *Service) MoveObjectToStorage(msg *coormsg.MoveObjectToStorage) (*coormsg.MoveObjectToStorageResp, *mq.CodeMessage) { | |||
| func (svc *Service) PackageMovedToStorage(msg *coormq.PackageMovedToStorage) (*coormq.PackageMovedToStorageResp, *mq.CodeMessage) { | |||
| // TODO: 对于的storage中已经存在的文件,直接覆盖已有文件 | |||
| err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error { | |||
| return svc.db.StorageObject().MoveObjectTo(tx, msg.ObjectID, msg.StorageID, msg.UserID) | |||
| return svc.db.StoragePackage().MovePackageTo(tx, msg.PackageID, msg.StorageID, msg.UserID) | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| WithField("ObjectID", msg.ObjectID). | |||
| WithField("PackageID", msg.PackageID). | |||
| WithField("StorageID", msg.StorageID). | |||
| Warnf("user move object to storage failed, err: %s", err.Error()) | |||
| return mq.ReplyFailed[coormsg.MoveObjectToStorageResp](errorcode.OperationFailed, "user move object to storage failed") | |||
| Warnf("user move package to storage failed, err: %s", err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, "user move package to storage failed") | |||
| } | |||
| return mq.ReplyOK(coormsg.NewMoveObjectToStorageResp()) | |||
| return mq.ReplyOK(coormq.NewPackageMovedToStorageResp()) | |||
| } | |||
| @@ -5,10 +5,9 @@ import ( | |||
| "os" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| mydb "gitlink.org.cn/cloudream/storage-common/pkgs/db" | |||
| sccli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/scanner" | |||
| rasvr "gitlink.org.cn/cloudream/storage-common/pkgs/mq/server/coordinator" | |||
| coormq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/coordinator" | |||
| scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" | |||
| "gitlink.org.cn/cloudream/storage-coordinator/internal/config" | |||
| "gitlink.org.cn/cloudream/storage-coordinator/internal/services" | |||
| ) | |||
| @@ -28,21 +27,21 @@ func main() { | |||
| db, err := mydb.NewDB(&config.Cfg().DB) | |||
| if err != nil { | |||
| log.Fatalf("new db failed, err: %s", err.Error()) | |||
| logger.Fatalf("new db failed, err: %s", err.Error()) | |||
| } | |||
| scanner, err := sccli.NewClient(&config.Cfg().RabbitMQ) | |||
| scanner, err := scmq.NewClient(&config.Cfg().RabbitMQ) | |||
| if err != nil { | |||
| log.Fatalf("new scanner client failed, err: %s", err.Error()) | |||
| logger.Fatalf("new scanner client failed, err: %s", err.Error()) | |||
| } | |||
| coorSvr, err := rasvr.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) | |||
| coorSvr, err := coormq.NewServer(services.NewService(db, scanner), &config.Cfg().RabbitMQ) | |||
| if err != nil { | |||
| log.Fatalf("new coordinator server failed, err: %s", err.Error()) | |||
| logger.Fatalf("new coordinator server failed, err: %s", err.Error()) | |||
| } | |||
| coorSvr.OnError = func(err error) { | |||
| log.Warnf("coordinator server err: %s", err.Error()) | |||
| logger.Warnf("coordinator server err: %s", err.Error()) | |||
| } | |||
| // 启动服务 | |||
| @@ -52,13 +51,13 @@ func main() { | |||
| <-forever | |||
| } | |||
| func serveCoorServer(server *rasvr.Server) { | |||
| log.Info("start serving command server") | |||
| func serveCoorServer(server *coormq.Server) { | |||
| logger.Info("start serving command server") | |||
| err := server.Serve() | |||
| if err != nil { | |||
| log.Errorf("command server stopped with error: %s", err.Error()) | |||
| logger.Errorf("command server stopped with error: %s", err.Error()) | |||
| } | |||
| log.Info("command server stopped") | |||
| logger.Info("command server stopped") | |||
| } | |||