From 9d165bfb7e3812b281c7c55602bf504df8ccd5bd Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 14 Aug 2023 11:28:23 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E9=A1=B9=E7=9B=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 33 +++++---------- go.sum | 21 +++++----- internal/config/config.go | 2 +- internal/services/cmd/agent.go | 8 ++-- internal/services/cmd/ipfs.go | 16 +++---- internal/services/cmd/object.go | 20 ++++----- internal/services/cmd/storage.go | 58 +++++++++++++------------- internal/services/grpc/grpc_service.go | 2 +- internal/task/ec_read.go | 4 +- internal/task/task.go | 2 +- internal/task/upload_rep_objects.go | 8 ++-- main.go | 6 +-- status_report.go | 4 +- 13 files changed, 85 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index 3f4e6f8..e5be43f 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,8 @@ require ( github.com/ipfs/go-ipfs-api v0.6.0 github.com/samber/lo v1.38.1 gitlink.org.cn/cloudream/common v0.0.0 - gitlink.org.cn/cloudream/ec v0.0.0 - gitlink.org.cn/cloudream/proto v0.0.0 - gitlink.org.cn/cloudream/rabbitmq v0.0.0 - google.golang.org/grpc v1.54.0 - magefiles v0.0.0 + gitlink.org.cn/cloudream/storage-common v0.0.0 + google.golang.org/grpc v1.57.0 ) require ( @@ -36,7 +33,6 @@ require ( github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect github.com/libp2p/go-libp2p v0.27.0 // indirect - github.com/magefile/mage v1.15.0 // indirect github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect @@ -51,13 +47,11 @@ require ( github.com/multiformats/go-multihash v0.2.1 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/otiai10/copy v1.12.0 // indirect github.com/sirupsen/logrus v1.9.2 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/streadway/amqp v1.0.0 // indirect + github.com/streadway/amqp v1.1.0 // indirect github.com/whyrusleeping/tar-utils v0.0.0-20201201191210-20a61371de5b // indirect github.com/zyedidia/generic v1.2.1 // indirect - gitlink.org.cn/cloudream/db v0.0.0 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect go.etcd.io/etcd/client/v3 v3.5.9 // indirect @@ -70,22 +64,15 @@ require ( golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect - google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd // indirect + google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect google.golang.org/protobuf v1.30.0 // indirect lukechampine.com/blake3 v1.1.7 // indirect ) -go 1.18 - // 运行go mod tidy时需要将下面几行取消注释 -// replace gitlink.org.cn/cloudream/ec => ../ec -// -// replace gitlink.org.cn/cloudream/proto => ../proto -// -// replace gitlink.org.cn/cloudream/rabbitmq => ../rabbitmq -// -// replace gitlink.org.cn/cloudream/common => ../common -// -// replace gitlink.org.cn/cloudream/db => ../db -// -// replace magefiles => ../magefiles + +replace gitlink.org.cn/cloudream/common => ../../common + +replace gitlink.org.cn/cloudream/storage-common => ../storage-common diff --git a/go.sum b/go.sum index 6e35240..60ed8ef 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,6 @@ github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFG github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= github.com/libp2p/go-libp2p v0.27.0 h1:QbhrTuB0ln9j9op6yAOR0o+cx/qa9NyNZ5ov0Tql8ZU= github.com/libp2p/go-libp2p v0.27.0/go.mod h1:FAvvfQa/YOShUYdiSS03IR9OXzkcJXwcNA2FUCh9ImE= -github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= -github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g= github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= @@ -92,9 +90,6 @@ github.com/multiformats/go-multistream v0.4.1 h1:rFy0Iiyn3YT0asivDUIR05leAdwZq3d github.com/multiformats/go-multistream v0.4.1/go.mod h1:Mz5eykRVAjJWckE2U78c6xqdtyNUEhKSM0Lwar2p77Q= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= -github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY= -github.com/otiai10/copy v1.12.0/go.mod h1:rSaLseMUsZFFbsFGc7wCJnnkTAvdc5L6VWxPE4308Ww= -github.com/otiai10/mint v1.5.1 h1:XaPLeE+9vGbuyEHem1JNk3bYc7KKqyI/na0/mLd/Kks= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -106,8 +101,8 @@ github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0Zxu github.com/smartystreets/goconvey v1.8.0 h1:Oi49ha/2MURE0WexF052Z0m+BNSGirfjg5RL+JXWq3w= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM= +github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -175,10 +170,14 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd h1:sLpv7bNL1AsX3fdnWh9WVh7ejIzXdOc1RRHGeAmeStU= -google.golang.org/genproto v0.0.0-20230403163135-c38d8f061ccd/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak= -google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= -google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 h1:9NWlQfY2ePejTmfwUH1OWwmznFa+0kKcHGPDvcPza9M= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:m8v1xLLLzMe1m5P+gCTF8nJB9epwZQUBERm20Oy1poQ= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= diff --git a/internal/config/config.go b/internal/config/config.go index 47a9a1e..565448e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,7 +5,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkg/logger" c "gitlink.org.cn/cloudream/common/utils/config" "gitlink.org.cn/cloudream/common/utils/ipfs" - racfg "gitlink.org.cn/cloudream/rabbitmq/config" + racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config" ) type Config struct { diff --git a/internal/services/cmd/agent.go b/internal/services/cmd/agent.go index 443480a..3c976f9 100644 --- a/internal/services/cmd/agent.go +++ b/internal/services/cmd/agent.go @@ -2,11 +2,11 @@ package cmd import ( "gitlink.org.cn/cloudream/common/consts" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + "gitlink.org.cn/cloudream/common/pkg/mq" + agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" ) -func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *ramsg.CodeMessage) { +func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *mq.CodeMessage) { var ipfsState string if svc.ipfs.IsUp() { @@ -15,5 +15,5 @@ func (svc *Service) GetState(msg *agtmsg.GetState) (*agtmsg.GetStateResp, *ramsg ipfsState = consts.IPFSStateOK } - return ramsg.ReplyOK(agtmsg.NewGetStateRespBody(ipfsState)) + return mq.ReplyOK(agtmsg.NewGetStateRespBody(ipfsState)) } diff --git a/internal/services/cmd/ipfs.go b/internal/services/cmd/ipfs.go index 774691b..582a38a 100644 --- a/internal/services/cmd/ipfs.go +++ b/internal/services/cmd/ipfs.go @@ -7,17 +7,17 @@ import ( "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkg/logger" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + "gitlink.org.cn/cloudream/common/pkg/mq" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" ) -func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { +func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) { filesMap, err := svc.ipfs.GetPinnedFiles() if err != nil { logger.Warnf("get pinned files from ipfs failed, err: %s", err.Error()) - return ramsg.ReplyFailed[agtmsg.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") + return mq.ReplyFailed[agtmsg.CheckIPFSResp](errorcode.OperationFailed, "get pinned files from ipfs failed") } // TODO 根据锁定清单过滤被锁定的文件的记录 @@ -28,7 +28,7 @@ func (svc *Service) CheckIPFS(msg *agtmsg.CheckIPFS) (*agtmsg.CheckIPFSResp, *ra } } -func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { +func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmsg.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -60,10 +60,10 @@ func (svc *Service) checkIncrement(msg *agtmsg.CheckIPFS, filesMap map[string]sh // 增量情况下,不需要对filesMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) } -func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *ramsg.CodeMessage) { +func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]shell.PinInfo) (*agtmsg.CheckIPFSResp, *mq.CodeMessage) { var entries []agtmsg.CheckIPFSRespEntry for _, cache := range msg.Caches { _, ok := filesMap[cache.FileHash] @@ -103,5 +103,5 @@ func (svc *Service) checkComplete(msg *agtmsg.CheckIPFS, filesMap map[string]she entries = append(entries, agtmsg.NewCheckIPFSRespEntry(hash, agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP)) } - return ramsg.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) + return mq.ReplyOK(agtmsg.NewCheckIPFSResp(entries)) } diff --git a/internal/services/cmd/object.go b/internal/services/cmd/object.go index 1cb997e..38388d3 100644 --- a/internal/services/cmd/object.go +++ b/internal/services/cmd/object.go @@ -5,12 +5,12 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" log "gitlink.org.cn/cloudream/common/pkg/logger" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + "gitlink.org.cn/cloudream/common/pkg/mq" "gitlink.org.cn/cloudream/storage-agent/internal/task" + agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" ) -func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.StartPinningObjectResp, *ramsg.CodeMessage) { +func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg.StartPinningObjectResp, *mq.CodeMessage) { log.WithField("FileHash", msg.FileHash).Debugf("pin object") tsk := svc.taskManager.StartComparable(task.NewIPFSPin(msg.FileHash)) @@ -18,18 +18,18 @@ func (svc *Service) StartPinningObject(msg *agtmsg.StartPinningObject) (*agtmsg. if tsk.Error() != nil { log.WithField("FileHash", msg.FileHash). Warnf("pin object failed, err: %s", tsk.Error().Error()) - return ramsg.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed") + return mq.ReplyFailed[agtmsg.StartPinningObjectResp](errorcode.OperationFailed, "pin object failed") } - return ramsg.ReplyOK(agtmsg.NewStartPinningObjectResp(tsk.ID())) + return mq.ReplyOK(agtmsg.NewStartPinningObjectResp(tsk.ID())) } -func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.WaitPinningObjectResp, *ramsg.CodeMessage) { +func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.WaitPinningObjectResp, *mq.CodeMessage) { log.WithField("TaskID", msg.TaskID).Debugf("wait pinning object") tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return ramsg.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found") + return mq.ReplyFailed[agtmsg.WaitPinningObjectResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -40,7 +40,7 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.Wa errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) + return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -50,9 +50,9 @@ func (svc *Service) WaitPinningObject(msg *agtmsg.WaitPinningObject) (*agtmsg.Wa errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) + return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(true, errMsg)) } - return ramsg.ReplyOK(agtmsg.NewWaitPinningObjectResp(false, "")) + return mq.ReplyOK(agtmsg.NewWaitPinningObjectResp(false, "")) } } diff --git a/internal/services/cmd/storage.go b/internal/services/cmd/storage.go index 2a1e513..1690954 100644 --- a/internal/services/cmd/storage.go +++ b/internal/services/cmd/storage.go @@ -10,19 +10,19 @@ import ( "github.com/samber/lo" "gitlink.org.cn/cloudream/common/consts" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils" - "gitlink.org.cn/cloudream/ec" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + "gitlink.org.cn/cloudream/storage-common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/ec" "gitlink.org.cn/cloudream/common/consts/errorcode" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + "gitlink.org.cn/cloudream/common/pkg/mq" + agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" ) -func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *ramsg.CodeMessage) { +func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObject) (*agtmsg.StartStorageMoveObjectResp, *mq.CodeMessage) { // TODO 修改文件名,可用objectname outFileName := utils.MakeMoveOperationFileName(msg.ObjectID, msg.UserID) objectDir := filepath.Dir(msg.ObjectName) @@ -32,22 +32,22 @@ func (service *Service) StartStorageMoveObject(msg *agtmsg.StartStorageMoveObjec taskID, err := service.moveRepObject(repRed, outFilePath) if err != nil { logger.Warnf("move rep object as %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed") + return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move rep object failed") } - return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) + return mq.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) } else if repRed, ok := msg.Redundancy.(models.ECRedundancyData); ok { taskID, err := service.moveEcObject(msg.ObjectID, msg.FileSize, repRed, outFilePath) if err != nil { logger.Warnf("move ec object as %s failed, err: %s", outFilePath, err.Error()) - return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move ec object failed") + return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "move ec object failed") } - return ramsg.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) + return mq.ReplyOK(agtmsg.NewStartStorageMoveObjectResp(taskID)) } - return ramsg.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not rep or ec object???") + return mq.ReplyFailed[agtmsg.StartStorageMoveObjectResp](errorcode.OperationFailed, "not rep or ec object???") } func (svc *Service) moveRepObject(repData models.RepRedundancyData, outFilePath string) (string, error) { @@ -67,12 +67,12 @@ func (svc *Service) moveEcObject(objID int64, fileSize int64, ecData models.ECRe return tsk.ID(), nil } -func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *ramsg.CodeMessage) { +func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*agtmsg.WaitStorageMoveObjectResp, *mq.CodeMessage) { logger.WithField("TaskID", msg.TaskID).Debugf("wait moving object") tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return ramsg.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found") + return mq.ReplyFailed[agtmsg.WaitStorageMoveObjectResp](errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { @@ -83,7 +83,7 @@ func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*a errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) + return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) } else { if tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { @@ -93,20 +93,20 @@ func (svc *Service) WaitStorageMoveObject(msg *agtmsg.WaitStorageMoveObject) (*a errMsg = tsk.Error().Error() } - return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) + return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(true, errMsg)) } - return ramsg.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, "")) + return mq.ReplyOK(agtmsg.NewWaitStorageMoveObjectResp(false, "")) } } -func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { +func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheckResp, *mq.CodeMessage) { dirFullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.Directory) infos, err := ioutil.ReadDir(dirFullPath) if err != nil { logger.Warnf("list storage directory failed, err: %s", err.Error()) - return ramsg.ReplyOK(agtmsg.NewStorageCheckResp( + return mq.ReplyOK(agtmsg.NewStorageCheckResp( err.Error(), nil, )) @@ -121,7 +121,7 @@ func (svc *Service) StorageCheck(msg *agtmsg.StorageCheck) (*agtmsg.StorageCheck } } -func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { +func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *mq.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { infosMap[info.Name()] = info @@ -145,10 +145,10 @@ func (svc *Service) checkStorageIncrement(msg *agtmsg.StorageCheck, fileInfos [] // 增量情况下,不需要对infosMap中没检查的记录进行处理 - return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) + return mq.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } -func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *ramsg.CodeMessage) { +func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []fs.FileInfo) (*agtmsg.StorageCheckResp, *mq.CodeMessage) { infosMap := make(map[string]fs.FileInfo) for _, info := range fileInfos { @@ -171,7 +171,7 @@ func (svc *Service) checkStorageComplete(msg *agtmsg.StorageCheck, fileInfos []f } } - return ramsg.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) + return mq.ReplyOK(agtmsg.NewStorageCheckResp(consts.StorageDirectoryStateOK, entries)) } func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int, numPacket int64) { @@ -213,20 +213,20 @@ func decode(inBufs []chan []byte, outBufs []chan []byte, blockSeq []int, ecK int } } -func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *ramsg.CodeMessage) { +func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRepObject) (*agtmsg.StartStorageUploadRepObjectResp, *mq.CodeMessage) { fullPath := filepath.Join(config.Cfg().StorageBaseDir, msg.StorageDirectory, msg.FilePath) file, err := os.Open(fullPath) if err != nil { logger.Warnf("opening file %s: %s", fullPath, err.Error()) - return nil, ramsg.Failed(errorcode.OperationFailed, "open file failed") + return nil, mq.Failed(errorcode.OperationFailed, "open file failed") } fileInfo, err := file.Stat() if err != nil { file.Close() logger.Warnf("getting file %s state: %s", fullPath, err.Error()) - return nil, ramsg.Failed(errorcode.OperationFailed, "get file info failed") + return nil, mq.Failed(errorcode.OperationFailed, "get file info failed") } fileSize := fileInfo.Size() @@ -239,19 +239,19 @@ func (svc *Service) StartStorageUploadRepObject(msg *agtmsg.StartStorageUploadRe // Task会关闭文件流 tsk := svc.taskManager.StartNew(task.NewUploadRepObjects(msg.UserID, msg.BucketID, uploadObjects, msg.RepCount)) - return ramsg.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID())) + return mq.ReplyOK(agtmsg.NewStartStorageUploadRepObjectResp(tsk.ID())) } -func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *ramsg.CodeMessage) { +func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepObject) (*agtmsg.WaitStorageUploadRepObjectResp, *mq.CodeMessage) { tsk := svc.taskManager.FindByID(msg.TaskID) if tsk == nil { - return nil, ramsg.Failed(errorcode.TaskNotFound, "task not found") + return nil, mq.Failed(errorcode.TaskNotFound, "task not found") } if msg.WaitTimeoutMs == 0 { tsk.Wait() } else if !tsk.WaitTimeout(time.Duration(msg.WaitTimeoutMs)) { - return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, "")) + return mq.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(false, "", 0, "")) } uploadTask := tsk.Body().(*task.UploadRepObjects) @@ -266,5 +266,5 @@ func (svc *Service) WaitStorageUploadRepObject(msg *agtmsg.WaitStorageUploadRepO errMsg = uploadRet.Error.Error() } - return ramsg.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash)) + return mq.ReplyOK(agtmsg.NewWaitStorageUploadRepObjectResp(true, errMsg, uploadRet.ObjectID, uploadRet.FileHash)) } diff --git a/internal/services/grpc/grpc_service.go b/internal/services/grpc/grpc_service.go index d3b5100..3e94980 100644 --- a/internal/services/grpc/grpc_service.go +++ b/internal/services/grpc/grpc_service.go @@ -7,7 +7,7 @@ import ( log "gitlink.org.cn/cloudream/common/pkg/logger" myio "gitlink.org.cn/cloudream/common/utils/io" "gitlink.org.cn/cloudream/common/utils/ipfs" - agentserver "gitlink.org.cn/cloudream/proto" + agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" ) type GRPCService struct { diff --git a/internal/task/ec_read.go b/internal/task/ec_read.go index 7b885be..6b711aa 100644 --- a/internal/task/ec_read.go +++ b/internal/task/ec_read.go @@ -7,10 +7,10 @@ import ( "path/filepath" "time" - "gitlink.org.cn/cloudream/common/models" "gitlink.org.cn/cloudream/common/pkg/logger" - "gitlink.org.cn/cloudream/ec" "gitlink.org.cn/cloudream/storage-agent/internal/config" + "gitlink.org.cn/cloudream/storage-common/models" + "gitlink.org.cn/cloudream/storage-common/pkgs/ec" ) type EcRead struct { diff --git a/internal/task/task.go b/internal/task/task.go index 47fb437..a6a3634 100644 --- a/internal/task/task.go +++ b/internal/task/task.go @@ -4,7 +4,7 @@ import ( distsvc "gitlink.org.cn/cloudream/common/pkg/distlock/service" "gitlink.org.cn/cloudream/common/pkg/task" "gitlink.org.cn/cloudream/common/utils/ipfs" - coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" + coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" ) type TaskContext struct { diff --git a/internal/task/upload_rep_objects.go b/internal/task/upload_rep_objects.go index ab1c185..087dddb 100644 --- a/internal/task/upload_rep_objects.go +++ b/internal/task/upload_rep_objects.go @@ -10,13 +10,13 @@ import ( "gitlink.org.cn/cloudream/common/pkg/distlock/reqbuilder" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils" - mygrpc "gitlink.org.cn/cloudream/common/utils/grpc" "gitlink.org.cn/cloudream/common/utils/ipfs" "gitlink.org.cn/cloudream/storage-agent/internal/config" + mygrpc "gitlink.org.cn/cloudream/storage-common/utils/grpc" - agentcaller "gitlink.org.cn/cloudream/proto" - ramsg "gitlink.org.cn/cloudream/rabbitmq/message" - coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" + ramsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message" + coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" + agentcaller "gitlink.org.cn/cloudream/storage-common/pkgs/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" diff --git a/main.go b/main.go index 88fe819..d720817 100644 --- a/main.go +++ b/main.go @@ -9,14 +9,14 @@ import ( distsvc "gitlink.org.cn/cloudream/common/pkg/distlock/service" log "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils/ipfs" - agentserver "gitlink.org.cn/cloudream/proto" "gitlink.org.cn/cloudream/storage-agent/internal/config" "gitlink.org.cn/cloudream/storage-agent/internal/task" + agentserver "gitlink.org.cn/cloudream/storage-common/pkgs/proto" "google.golang.org/grpc" - "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" - rasvr "gitlink.org.cn/cloudream/rabbitmq/server/agent" + "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" + rasvr "gitlink.org.cn/cloudream/storage-common/pkgs/mq/server/agent" cmdsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/cmd" grpcsvc "gitlink.org.cn/cloudream/storage-agent/internal/services/grpc" diff --git a/status_report.go b/status_report.go index d1fc8f0..40f1072 100644 --- a/status_report.go +++ b/status_report.go @@ -7,9 +7,9 @@ import ( "gitlink.org.cn/cloudream/common/consts" log "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/utils" - coorcli "gitlink.org.cn/cloudream/rabbitmq/client/coordinator" - coormsg "gitlink.org.cn/cloudream/rabbitmq/message/coordinator" "gitlink.org.cn/cloudream/storage-agent/internal/config" + coorcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/coordinator" + coormsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/coordinator" ) func reportStatus(wg *sync.WaitGroup) {