| @@ -29,6 +29,12 @@ func CacheMovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cds | |||||
| } | } | ||||
| } | } | ||||
| func CacheRemovePackage(ctx CommandContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { | |||||
| return ctx.Cmdline.Svc.CacheSvc().CacheRemovePackage(packageID, nodeID) | |||||
| } | |||||
| func init() { | func init() { | ||||
| commands.Add(CacheMovePackage, "cache", "move") | commands.Add(CacheMovePackage, "cache", "move") | ||||
| commands.Add(CacheRemovePackage, "cache", "remove") | |||||
| } | } | ||||
| @@ -8,6 +8,7 @@ import ( | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | stgglb "gitlink.org.cn/cloudream/storage/common/globals" | ||||
| agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" | agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" | ||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| ) | ) | ||||
| type CacheService struct { | type CacheService struct { | ||||
| @@ -55,3 +56,18 @@ func (svc *CacheService) WaitCacheMovePackage(nodeID cdssdk.NodeID, taskID strin | |||||
| return true, nil | return true, nil | ||||
| } | } | ||||
| func (svc *CacheService) CacheRemovePackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new agent client: %w", err) | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| _, err = coorCli.CacheRemovePackage(coormq.ReqCacheRemoveMovedPackage(packageID, nodeID)) | |||||
| if err != nil { | |||||
| return fmt.Errorf("requesting to coordinator: %w", err) | |||||
| } | |||||
| return nil | |||||
| } | |||||
| @@ -68,12 +68,13 @@ func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeI | |||||
| }) | }) | ||||
| } | } | ||||
| _, err := sqlx.NamedExec(ctx, | |||||
| return BatchNamedExec(ctx, | |||||
| "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ | "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ | ||||
| " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", | " on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)", | ||||
| 4, | |||||
| caches, | caches, | ||||
| nil, | |||||
| ) | ) | ||||
| return err | |||||
| } | } | ||||
| func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | ||||
| @@ -88,6 +88,11 @@ func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageI | |||||
| return err | return err | ||||
| } | } | ||||
| func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.PackageID, nodeID cdssdk.NodeID) error { | |||||
| _, err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and NodeID = ?", packageID, nodeID) | |||||
| return err | |||||
| } | |||||
| func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { | func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { | ||||
| query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) | query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -0,0 +1,43 @@ | |||||
| package ops | |||||
| import ( | |||||
| "io" | |||||
| "sync" | |||||
| myio "gitlink.org.cn/cloudream/common/utils/io" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch" | |||||
| ) | |||||
| type Clone struct { | |||||
| InputID ioswitch.StreamID `json:"inputID"` | |||||
| OutputIDs []ioswitch.StreamID `json:"outputIDs"` | |||||
| } | |||||
| func (o *Clone) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { | |||||
| strs, err := sw.WaitStreams(planID, o.InputID) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| defer strs[0].Stream.Close() | |||||
| wg := sync.WaitGroup{} | |||||
| cloned := myio.Clone(strs[0].Stream, len(o.OutputIDs)) | |||||
| for i, s := range cloned { | |||||
| wg.Add(1) | |||||
| sw.StreamReady(planID, | |||||
| ioswitch.NewStream(o.OutputIDs[i], | |||||
| myio.AfterReadClosedOnce(s, func(closer io.ReadCloser) { | |||||
| wg.Done() | |||||
| }), | |||||
| ), | |||||
| ) | |||||
| } | |||||
| wg.Wait() | |||||
| return nil | |||||
| } | |||||
| func init() { | |||||
| OpUnion.AddT((*Clone)(nil)) | |||||
| } | |||||
| @@ -5,6 +5,7 @@ import ( | |||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| "os" | "os" | ||||
| "path" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/future" | "gitlink.org.cn/cloudream/common/pkgs/future" | ||||
| myio "gitlink.org.cn/cloudream/common/utils/io" | myio "gitlink.org.cn/cloudream/common/utils/io" | ||||
| @@ -23,6 +24,12 @@ func (o *FileWrite) Execute(sw *ioswitch.Switch, planID ioswitch.PlanID) error { | |||||
| } | } | ||||
| defer str[0].Stream.Close() | defer str[0].Stream.Close() | ||||
| dir := path.Dir(o.FilePath) | |||||
| err = os.MkdirAll(dir, 0777) | |||||
| if err != nil { | |||||
| return fmt.Errorf("mkdir: %w", err) | |||||
| } | |||||
| file, err := os.Create(o.FilePath) | file, err := os.Create(o.FilePath) | ||||
| if err != nil { | if err != nil { | ||||
| return fmt.Errorf("opening file: %w", err) | return fmt.Errorf("opening file: %w", err) | ||||
| @@ -243,3 +243,24 @@ func (b *AgentPlanBuilder) ChunkedJoin(chunkSize int, streams ...*AgentStream) * | |||||
| return agtStr | return agtStr | ||||
| } | } | ||||
| func (s *AgentStream) Clone(cnt int) *MultiStream { | |||||
| mstr := &MultiStream{} | |||||
| var outputStrIDs []ioswitch.StreamID | |||||
| for i := 0; i < cnt; i++ { | |||||
| info := s.owner.owner.newStream() | |||||
| mstr.Streams = append(mstr.Streams, &AgentStream{ | |||||
| owner: s.owner, | |||||
| info: info, | |||||
| }) | |||||
| outputStrIDs = append(outputStrIDs, info.ID) | |||||
| } | |||||
| s.owner.ops = append(s.owner.ops, &ops.Clone{ | |||||
| InputID: s.info.ID, | |||||
| OutputIDs: outputStrIDs, | |||||
| }) | |||||
| return mstr | |||||
| } | |||||
| @@ -7,6 +7,8 @@ import ( | |||||
| type CacheService interface { | type CacheService interface { | ||||
| CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) | CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage) | ||||
| CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, *mq.CodeMessage) | |||||
| } | } | ||||
| // Package的Object移动到了节点的Cache中 | // Package的Object移动到了节点的Cache中 | ||||
| @@ -33,3 +35,28 @@ func NewCachePackageMovedResp() *CachePackageMovedResp { | |||||
| func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { | func (client *Client) CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, error) { | ||||
| return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) | return mq.Request(Service.CachePackageMoved, client.rabbitCli, msg) | ||||
| } | } | ||||
| // 删除移动到指定节点Cache中的Package | |||||
| var _ = Register(Service.CacheRemovePackage) | |||||
| type CacheRemovePackage struct { | |||||
| mq.MessageBodyBase | |||||
| PackageID cdssdk.PackageID `json:"packageID"` | |||||
| NodeID cdssdk.NodeID `json:"nodeID"` | |||||
| } | |||||
| type CacheRemovePackageResp struct { | |||||
| mq.MessageBodyBase | |||||
| } | |||||
| func ReqCacheRemoveMovedPackage(packageID cdssdk.PackageID, nodeID cdssdk.NodeID) *CacheRemovePackage { | |||||
| return &CacheRemovePackage{ | |||||
| PackageID: packageID, | |||||
| NodeID: nodeID, | |||||
| } | |||||
| } | |||||
| func RespCacheRemovePackage() *CacheRemovePackageResp { | |||||
| return &CacheRemovePackageResp{} | |||||
| } | |||||
| func (client *Client) CacheRemovePackage(msg *CacheRemovePackage) (*CacheRemovePackageResp, error) { | |||||
| return mq.Request(Service.CacheRemovePackage, client.rabbitCli, msg) | |||||
| } | |||||
| @@ -37,3 +37,30 @@ func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.Ca | |||||
| return mq.ReplyOK(coormq.NewCachePackageMovedResp()) | return mq.ReplyOK(coormq.NewCachePackageMovedResp()) | ||||
| } | } | ||||
| func (svc *Service) CacheRemovePackage(msg *coormq.CacheRemovePackage) (*coormq.CacheRemovePackageResp, *mq.CodeMessage) { | |||||
| err := svc.db.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { | |||||
| _, err := svc.db.Package().GetByID(tx, msg.PackageID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting package by id: %w", err) | |||||
| } | |||||
| _, err = svc.db.Node().GetByID(tx, msg.NodeID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting node by id: %w", err) | |||||
| } | |||||
| err = svc.db.PinnedObject().DeleteInPackageAtNode(tx, msg.PackageID, msg.NodeID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("delete pinned objects in package at node: %w", err) | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| logger.WithField("PackageID", msg.PackageID).WithField("NodeID", msg.NodeID).Warn(err.Error()) | |||||
| return nil, mq.Failed(errorcode.OperationFailed, "remove pinned package failed") | |||||
| } | |||||
| return mq.ReplyOK(coormq.RespCacheRemovePackage()) | |||||
| } | |||||