diff --git a/main.go b/main.go index e71abf4..f6ba6fb 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( log "github.com/sirupsen/logrus" "gitlink.org.cn/cloudream/coordinator/config" + "gitlink.org.cn/cloudream/coordinator/services" mydb "gitlink.org.cn/cloudream/db" rasvr "gitlink.org.cn/cloudream/rabbitmq/server" "gitlink.org.cn/cloudream/utils/logger" @@ -29,7 +30,7 @@ func main() { log.Fatalf("new db failed, err: %s", err.Error()) } - cmdSvr, err := rasvr.NewCoordinatorServer(NewCommandService(db)) + cmdSvr, err := rasvr.NewCoordinatorServer(services.NewService(db)) if err != nil { log.Fatalf("new coordinator server failed, err: %s", err.Error()) } diff --git a/services/bucket.go b/services/bucket.go new file mode 100644 index 0000000..659c2c0 --- /dev/null +++ b/services/bucket.go @@ -0,0 +1,48 @@ +package services + +import ( + log "github.com/sirupsen/logrus" + "gitlink.org.cn/cloudream/db/model" + ramsg "gitlink.org.cn/cloudream/rabbitmq/message" + "gitlink.org.cn/cloudream/utils/consts/errorcode" +) + +func (svc *Service) GetBucket(userID int, bucketID int) (model.Bucket, error) { + // TODO + panic("not implement yet") +} + +func (svc *Service) GetUserBuckets(msg *ramsg.GetUserBucketsCommand) ramsg.GetUserBucketsResp { + buckets, err := svc.db.GetUserBuckets(msg.UserID) + + if err != nil { + log.WithField("UserID", msg.UserID). + Warnf("get user buckets failed, err: %s", err.Error()) + return ramsg.NewGetUserBucketsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed") + } + + return ramsg.NewGetUserBucketsRespOK(buckets) +} + +func (svc *Service) GetBucketObjects(msg *ramsg.GetBucketObjectsCommand) ramsg.GetBucketObjectsResp { + objects, err := svc.db.GetBucketObjects(msg.UserID, msg.BucketID) + + if err != nil { + log.WithField("UserID", msg.UserID). + WithField("BucketID", msg.BucketID). + Warnf("get bucket objects failed, err: %s", err.Error()) + return ramsg.NewGetBucketObjectsRespFailed(errorcode.OPERATION_FAILED, "get all buckets failed") + } + + return ramsg.NewGetBucketObjectsRespOK(objects) +} + +func (svc *Service) CreateBucket(userID int, bucketName string) (model.Bucket, error) { + // TODO + panic("not implement yet") +} + +func (src *Service) DeleteBucket(userID int, bucketID int) error { + // TODO + panic("not implement yet") +} diff --git a/command_service.go b/services/command_service.go similarity index 92% rename from command_service.go rename to services/command_service.go index d319edf..ce19cf8 100644 --- a/command_service.go +++ b/services/command_service.go @@ -1,8 +1,7 @@ -package main +package services import ( log "github.com/sirupsen/logrus" - mydb "gitlink.org.cn/cloudream/db" ramsg "gitlink.org.cn/cloudream/rabbitmq/message" "gitlink.org.cn/cloudream/utils" @@ -10,17 +9,7 @@ import ( "gitlink.org.cn/cloudream/utils/consts/errorcode" ) -type CommandService struct { - db *mydb.DB -} - -func NewCommandService(db *mydb.DB) *CommandService { - return &CommandService{ - db: db, - } -} - -func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { +func (service *Service) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { var hashes []string blockIDs := []int{0} @@ -106,7 +95,7 @@ func (service *CommandService) Read(msg *ramsg.ReadCommand) ramsg.ReadResp { ) } -func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp { +func (service *Service) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteResp { // TODO 需要在此处判断同名对象是否存在。等到WriteRepHash时再判断一次。 // 此次的判断只作为参考,具体是否成功还是看WriteRepHash的结果 @@ -138,7 +127,7 @@ func (service *CommandService) RepWrite(msg *ramsg.RepWriteCommand) ramsg.WriteR return ramsg.NewCoorWriteRespOK(ids, ips) } -func (service *CommandService) WriteRepHash(msg *ramsg.WriteRepHashCommand) ramsg.WriteHashResp { +func (service *Service) WriteRepHash(msg *ramsg.WriteRepHashCommand) ramsg.WriteHashResp { _, err := service.db.CreateRepObject(msg.BucketID, msg.ObjectName, msg.FileSizeInBytes, msg.ReplicateNumber, msg.NodeIDs, msg.Hashes) if err != nil { log.WithField("BucketName", msg.BucketID). @@ -150,7 +139,7 @@ func (service *CommandService) WriteRepHash(msg *ramsg.WriteRepHashCommand) rams return ramsg.NewCoorWriteHashRespOK() } -func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { +func (service *Service) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { //查询数据库,获取冗余类型,冗余参数 //jh:使用command中的bucketname和objectname查询对象表,获得redundancy,EcName,fileSizeInBytes //-若redundancy是rep,查询对象副本表, 获得repHash @@ -239,11 +228,11 @@ func (service *CommandService) Move(msg *ramsg.MoveCommand) ramsg.MoveResp { ) } -func (service *CommandService) TempCacheReport(msg *ramsg.TempCacheReport) { +func (service *Service) TempCacheReport(msg *ramsg.TempCacheReport) { service.db.BatchInsertOrUpdateCache(msg.Hashes, msg.NodeID) } -func (service *CommandService) AgentStatusReport(msg *ramsg.AgentStatusReport) { +func (service *Service) AgentStatusReport(msg *ramsg.AgentStatusReport) { //jh:根据command中的Ip,插入节点延迟表,和节点表的NodeStatus //根据command中的Ip,插入节点延迟表 diff --git a/command_service_ec.go b/services/command_service_ec.go similarity index 92% rename from command_service_ec.go rename to services/command_service_ec.go index beb4f30..49db386 100644 --- a/command_service_ec.go +++ b/services/command_service_ec.go @@ -1,10 +1,10 @@ -package main +package services import ( ramsg "gitlink.org.cn/cloudream/rabbitmq/message" ) -func (service *CommandService) ECWrite(msg *ramsg.ECWriteCommand) ramsg.WriteResp { +func (service *Service) ECWrite(msg *ramsg.ECWriteCommand) ramsg.WriteResp { panic("not implement yet!") /* @@ -56,7 +56,7 @@ func (service *CommandService) ECWrite(msg *ramsg.ECWriteCommand) ramsg.WriteRes */ } -func (service *CommandService) WriteECHash(msg *ramsg.WriteECHashCommand) ramsg.WriteHashResp { +func (service *Service) WriteECHash(msg *ramsg.WriteECHashCommand) ramsg.WriteHashResp { panic("not implement yet!") /* diff --git a/services/service.go b/services/service.go new file mode 100644 index 0000000..9bc1d23 --- /dev/null +++ b/services/service.go @@ -0,0 +1,13 @@ +package services + +import mydb "gitlink.org.cn/cloudream/db" + +type Service struct { + db *mydb.DB +} + +func NewService(db *mydb.DB) *Service { + return &Service{ + db: db, + } +}