diff --git a/client/internal/cmdline/cache.go b/client/internal/cmdline/cache.go index 7a65745..a94859e 100644 --- a/client/internal/cmdline/cache.go +++ b/client/internal/cmdline/cache.go @@ -29,6 +29,12 @@ func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cds } } +func CacheRemovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + return ctx.Cmdline.Svc.CacheSvc().CacheRemovePackage(packageID, nodeID) +} + func init() { commands.Add(CacheMovePackage, "cache", "move") + + commands.Add(CacheRemovePackage, "cache", "remove") } diff --git a/client/internal/services/cache.go b/client/internal/services/cache.go index ac37be9..a32e456 100644 --- a/client/internal/services/cache.go +++ b/client/internal/services/cache.go @@ -8,6 +8,7 @@ import ( stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" + coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) type CacheService struct { @@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID strin return true, nil } + +func (svc *CacheService) CacheRemovePackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + coorCli, err := stgglb.CoordinatorMQPool.Acquire() + if err != nil { + return fmt.Errorf("new agent client: %w", err) + } + defer stgglb.CoordinatorMQPool.Release(coorCli) + + _, err = coorCli.CacheRemovePackage(coormq.ReqCacheRemoveMovedPackage(packageID, nodeID)) + if err != nil { + return fmt.Errorf("requesting to coordinator: %w", err) + } + + return nil +} diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 964326f..84ff1ca 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -68,12 +68,13 @@ func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeI }) } - _, err := sqlx.NamedExec(ctx, + return BatchNamedExec(ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", + 4, caches, + nil, ) - return err } func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 1cfd981..6853315 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -88,6 +88,11 @@ func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageI return err } +func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { + _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and NodeID = ?", packageID, nodeID) + return err +} + func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) if err != nil { diff --git a/common/pkgs/ioswitch/ops/clone.go b/common/pkgs/ioswitch/ops/clone.go new file mode 100644 index 0000000..670159a --- /dev/null +++ b/common/pkgs/ioswitch/ops/clone.go @@ -0,0 +1,43 @@ +package ops + +import ( + "io" + "sync" + + myio "gitlink.org.cn/cloudream/common/utils/io" + "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" +) + +type Clone struct { + InputID ioswitch.StreamID `json:"inputID"` + OutputIDs []ioswitch.StreamID `json:"outputIDs"` +} + +func (o *Clone) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { + strs, err := sw.WaitStreams(planID, o.InputID) + if err != nil { + return err + } + defer strs[0].Stream.Close() + + wg := sync.WaitGroup{} + cloned := myio.Clone(strs[0].Stream, len(o.OutputIDs)) + for i, s := range cloned { + wg.Add(1) + + sw.StreamReady(planID, + ioswitch.NewStream(o.OutputIDs[i], + myio.AfterReadClosedOnce(s, func(closer io.ReadCloser) { + wg.Done() + }), + ), + ) + } + + wg.Wait() + return nil +} + +func init() { + OpUnion.AddT((*Clone)(nil)) +} diff --git a/common/pkgs/ioswitch/ops/file.go b/common/pkgs/ioswitch/ops/file.go index 0219a53..8f9adcd 100644 --- a/common/pkgs/ioswitch/ops/file.go +++ b/common/pkgs/ioswitch/ops/file.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "os" + "path" "gitlink.org.cn/cloudream/common/pkgs/future" myio "gitlink.org.cn/cloudream/common/utils/io" @@ -23,6 +24,12 @@ func (o *FileWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { } defer str[0].Stream.Close() + dir := path.Dir(o.FilePath) + err = os.MkdirAll(dir, 0777) + if err != nil { + return fmt.Errorf("mkdir: %w", err) + } + file, err := os.Create(o.FilePath) if err != nil { return fmt.Errorf("opening file: %w", err) diff --git a/common/pkgs/ioswitch/plans/agent_plan.go b/common/pkgs/ioswitch/plans/agent_plan.go index 1f78a37..9c75a8a 100644 --- a/common/pkgs/ioswitch/plans/agent_plan.go +++ b/common/pkgs/ioswitch/plans/agent_plan.go @@ -243,3 +243,24 @@ func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams ...*AgentStream) * return agtStr } + +func (s *AgentStream) Clone(cnt int) *MultiStream { + mstr := &MultiStream{} + + var outputStrIDs []ioswitch.StreamID + for i := 0; i < cnt; i++ { + info := s.owner.owner.newStream() + mstr.Streams = append(mstr.Streams, &AgentStream{ + owner: s.owner, + info: info, + }) + outputStrIDs = append(outputStrIDs, info.ID) + } + + s.owner.ops = append(s.owner.ops, &ops.Clone{ + InputID: s.info.ID, + OutputIDs: outputStrIDs, + }) + + return mstr +} diff --git a/common/pkgs/mq/coordinator/cache.go b/common/pkgs/mq/coordinator/cache.go index 92acbf6..ff75895 100644 --- a/common/pkgs/mq/coordinator/cache.go +++ b/common/pkgs/mq/coordinator/cache.go @@ -7,6 +7,8 @@ import ( type CacheService interface { CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) + + CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, *mq.CodeMessage) } // Package的Object移动到了节点的Cache中 @@ -33,3 +35,28 @@ func NewCachePackageMovedResp() *CachePackageMovedResp { func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) } + +// 删除移动到指定节点Cache中的Package +var _ = Register(Service.CacheRemovePackage) + +type CacheRemovePackage struct { + mq.MessageBodyBase + PackageID cdssdk.PackageID `json:"packageID"` + NodeID cdssdk.NodeID `json:"nodeID"` +} +type CacheRemovePackageResp struct { + mq.MessageBodyBase +} + +func ReqCacheRemoveMovedPackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) *CacheRemovePackage { + return &CacheRemovePackage{ + PackageID: packageID, + NodeID: nodeID, + } +} +func RespCacheRemovePackage() *CacheRemovePackageResp { + return &CacheRemovePackageResp{} +} +func (client *Client) CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, error) { + return mq.Request(Service.CacheRemovePackage, client.rabbitCli, msg) +} diff --git a/coordinator/internal/mq/cache.go b/coordinator/internal/mq/cache.go index 215f8a0..0e097f8 100644 --- a/coordinator/internal/mq/cache.go +++ b/coordinator/internal/mq/cache.go @@ -37,3 +37,30 @@ func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.Ca return mq.ReplyOK(coormq.NewCachePackageMovedResp()) } + +func (svc *Service) CacheRemovePackage(msg *coormq.CacheRemovePackage) (*coormq.CacheRemovePackageResp, *mq.CodeMessage) { + err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + _, err := svc.db.Package().GetByID(tx, msg.PackageID) + if err != nil { + return fmt.Errorf("getting package by id: %w", err) + } + + _, err = svc.db.Node().GetByID(tx, msg.NodeID) + if err != nil { + return fmt.Errorf("getting node by id: %w", err) + } + + err = svc.db.PinnedObject().DeleteInPackageAtNode(tx, msg.PackageID, msg.NodeID) + if err != nil { + return fmt.Errorf("delete pinned objects in package at node: %w", err) + } + + return nil + }) + if err != nil { + logger.WithField("PackageID", msg.PackageID).WithField("NodeID", msg.NodeID).Warn(err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "remove pinned package failed") + } + + return mq.ReplyOK(coormq.RespCacheRemovePackage()) +}