diff --git a/agent/internal/config/config.go b/agent/internal/config/config.go index 21c842d..bdc345d 100644 --- a/agent/internal/config/config.go +++ b/agent/internal/config/config.go @@ -2,7 +2,6 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" log "gitlink.org.cn/cloudream/common/pkgs/logger" c "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" @@ -19,7 +18,6 @@ type Config struct { GRPC *grpc.Config `json:"grpc"` Logger log.Config `json:"logger"` RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS ipfs.Config `json:"ipfs"` DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` diff --git a/agent/internal/mq/agent.go b/agent/internal/mq/agent.go index b442beb..cbfef37 100644 --- a/agent/internal/mq/agent.go +++ b/agent/internal/mq/agent.go @@ -1,29 +1,10 @@ package mq import ( - "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/common/consts" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" ) func (svc *Service) GetState(msg *agtmq.GetState) (*agtmq.GetStateResp, *mq.CodeMessage) { - var ipfsState string - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - logger.Warnf("new ipfs client: %s", err.Error()) - ipfsState = consts.IPFSStateUnavailable - - } else { - if ipfsCli.IsUp() { - ipfsState = consts.IPFSStateOK - } else { - ipfsState = consts.IPFSStateUnavailable - } - ipfsCli.Close() - } - - return mq.ReplyOK(agtmq.NewGetStateResp(ipfsState)) + return mq.ReplyOK(agtmq.NewGetStateResp()) } diff --git a/agent/internal/mq/object.go b/agent/internal/mq/object.go deleted file mode 100644 index dbdaacc..0000000 --- a/agent/internal/mq/object.go +++ /dev/null @@ -1,28 +0,0 @@ -package mq - -import ( - "gitlink.org.cn/cloudream/common/consts/errorcode" - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/mq" - "gitlink.org.cn/cloudream/storage/agent/internal/task" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" -) - -func (svc *Service) PinObject(msg *agtmq.PinObject) (*agtmq.PinObjectResp, *mq.CodeMessage) { - logger.WithField("FileHash", msg.FileHashes).Debugf("pin object") - - tsk := svc.taskManager.StartNew(task.NewIPFSPin(msg.FileHashes)) - - if tsk.Error() != nil { - logger.WithField("FileHash", msg.FileHashes). - Warnf("pin object failed, err: %s", tsk.Error().Error()) - return nil, mq.Failed(errorcode.OperationFailed, "pin object failed") - } - - if msg.IsBackground { - return mq.ReplyOK(agtmq.RespPinObject()) - } - - tsk.Wait() - return mq.ReplyOK(agtmq.RespPinObject()) -} diff --git a/agent/internal/task/ipfs_pin.go b/agent/internal/task/ipfs_pin.go deleted file mode 100644 index 622b45d..0000000 --- a/agent/internal/task/ipfs_pin.go +++ /dev/null @@ -1,55 +0,0 @@ -package task - -import ( - "fmt" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" -) - -type IPFSPin struct { - FileHashes []string -} - -func NewIPFSPin(fileHashes []string) *IPFSPin { - return &IPFSPin{ - FileHashes: fileHashes, - } -} - -func (t *IPFSPin) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - log := logger.WithType[IPFSPin]("Task") - log.Debugf("begin with %v", logger.FormatStruct(t)) - defer log.Debugf("end") - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - err := fmt.Errorf("new ipfs client: %w", err) - log.Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - defer ipfsCli.Close() - - for _, fileHash := range t.FileHashes { - err = ipfsCli.Pin(fileHash) - if err != nil { - err := fmt.Errorf("pin file failed, err: %w", err) - log.WithField("FileHash", fileHash).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - } - - complete(nil, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/agent/internal/task/ipfs_read.go b/agent/internal/task/ipfs_read.go deleted file mode 100644 index aab48bf..0000000 --- a/agent/internal/task/ipfs_read.go +++ /dev/null @@ -1,103 +0,0 @@ -package task - -import ( - "fmt" - "io" - "os" - "path/filepath" - "time" - - "gitlink.org.cn/cloudream/common/pkgs/logger" - "gitlink.org.cn/cloudream/common/pkgs/task" - stgglb "gitlink.org.cn/cloudream/storage/common/globals" -) - -type IPFSRead struct { - FileHash string - LocalPath string -} - -func NewIPFSRead(fileHash string, localPath string) *IPFSRead { - return &IPFSRead{ - FileHash: fileHash, - LocalPath: localPath, - } -} - -func (t *IPFSRead) Compare(other *Task) bool { - tsk, ok := other.Body().(*IPFSRead) - if !ok { - return false - } - - return t.FileHash == tsk.FileHash && t.LocalPath == tsk.LocalPath -} - -func (t *IPFSRead) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) { - log := logger.WithType[IPFSRead]("Task") - log.Debugf("begin with %v", logger.FormatStruct(t)) - defer log.Debugf("end") - - outputFileDir := filepath.Dir(t.LocalPath) - - err := os.MkdirAll(outputFileDir, os.ModePerm) - if err != nil { - err := fmt.Errorf("create output file directory %s failed, err: %w", outputFileDir, err) - log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - outputFile, err := os.Create(t.LocalPath) - if err != nil { - err := fmt.Errorf("create output file %s failed, err: %w", t.LocalPath, err) - log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - defer outputFile.Close() - - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - err := fmt.Errorf("new ipfs client: %w", err) - log.Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - defer ipfsCli.Close() - - rd, err := ipfsCli.OpenRead(t.FileHash) - if err != nil { - err := fmt.Errorf("read ipfs file failed, err: %w", err) - log.WithField("FileHash", t.FileHash).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - _, err = io.Copy(outputFile, rd) - if err != nil { - err := fmt.Errorf("copy ipfs file to local file failed, err: %w", err) - log.WithField("LocalPath", t.LocalPath).Warn(err.Error()) - - complete(err, CompleteOption{ - RemovingDelay: time.Minute, - }) - return - } - - complete(nil, CompleteOption{ - RemovingDelay: time.Minute, - }) -} diff --git a/agent/main.go b/agent/main.go index f47600f..69f9620 100644 --- a/agent/main.go +++ b/agent/main.go @@ -50,7 +50,6 @@ func main() { stgglb.InitLocal(&config.Cfg().Local) stgglb.InitMQPool(&config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) - stgglb.InitIPFSPool(&config.Cfg().IPFS) sw := exec.NewWorker() diff --git a/client/internal/config/config.go b/client/internal/config/config.go index df506ec..9c04e2e 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -2,7 +2,6 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" - "gitlink.org.cn/cloudream/common/pkgs/ipfs" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/utils/config" stgmodels "gitlink.org.cn/cloudream/storage/common/models" @@ -17,7 +16,6 @@ type Config struct { AgentGRPC agtrpc.PoolConfig `json:"agentGRPC"` Logger logger.Config `json:"logger"` RabbitMQ stgmq.Config `json:"rabbitMQ"` - IPFS *ipfs.Config `json:"ipfs"` // 此字段非空代表客户端上存在ipfs daemon DistLock distlock.Config `json:"distlock"` Connectivity connectivity.Config `json:"connectivity"` Downloader downloader.Config `json:"downloader"` diff --git a/client/main.go b/client/main.go index 7b90e81..ea172d1 100644 --- a/client/main.go +++ b/client/main.go @@ -37,11 +37,6 @@ func main() { stgglb.InitLocal(&config.Cfg().Local) stgglb.InitMQPool(&config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&config.Cfg().AgentGRPC) - if config.Cfg().IPFS != nil { - logger.Infof("IPFS config is not empty, so create a ipfs client") - - stgglb.InitIPFSPool(config.Cfg().IPFS) - } var conCol connectivity.Collector if config.Cfg().Local.NodeID != nil { diff --git a/common/consts/consts.go b/common/consts/consts.go index 1fb4563..450e5f2 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -1,9 +1,6 @@ package consts const ( - IPFSStateOK = "OK" - IPFSStateUnavailable = "Unavailable" - StorageDirectoryStateOK = "OK" NodeStateNormal = "Normal" diff --git a/common/globals/pools.go b/common/globals/pools.go index 0a09378..9b7ec7c 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -1,7 +1,6 @@ package stgglb import ( - "gitlink.org.cn/cloudream/common/pkgs/ipfs" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" stgmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq" agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" @@ -28,9 +27,3 @@ var AgentRPCPool *agtrpc.Pool func InitAgentRPCPool(cfg *agtrpc.PoolConfig) { AgentRPCPool = agtrpc.NewPool(cfg) } - -var IPFSPool *ipfs.Pool - -func InitIPFSPool(cfg *ipfs.Config) { - IPFSPool = ipfs.NewPool(cfg) -} diff --git a/common/pkgs/cmd/upload_objects.go b/common/pkgs/cmd/upload_objects.go index a1e39e4..cfc9a15 100644 --- a/common/pkgs/cmd/upload_objects.go +++ b/common/pkgs/cmd/upload_objects.go @@ -12,7 +12,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec" - "gitlink.org.cn/cloudream/common/pkgs/logger" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/sort2" @@ -22,7 +21,6 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2" "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch2/parser" "gitlink.org.cn/cloudream/storage/common/pkgs/iterator" - agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" ) @@ -216,33 +214,9 @@ func uploadAndUpdatePackage(packageID cdssdk.PackageID, objectIter iterator.Uplo } func uploadFile(file io.Reader, uploadNode UploadNodeInfo) (string, error) { - // 本地有IPFS,则直接从本地IPFS上传 - if stgglb.IPFSPool != nil { - logger.Debug("try to use local IPFS to upload file") - - // 只有本地IPFS不是存储系统中的一个节点,才需要Pin文件 - fileHash, err := uploadToLocalIPFS(file, uploadNode.Node.NodeID, stgglb.Local.NodeID == nil) - if err == nil { - return fileHash, nil - - } else { - logger.Warnf("upload to local IPFS failed, so try to upload to node %d, err: %s", uploadNode.Node.NodeID, err.Error()) - } - } - - // 否则发送到agent上传 - fileHash, err := uploadToNode(file, uploadNode.Node) - if err != nil { - return "", fmt.Errorf("uploading to node %v: %w", uploadNode.Node.NodeID, err) - } - - return fileHash, nil -} - -func uploadToNode(file io.Reader, node cdssdk.Node) (string, error) { ft := ioswitch2.NewFromTo() fromExec, hd := ioswitch2.NewFromDriver(-1) - ft.AddFrom(fromExec).AddTo(ioswitch2.NewToNode(node, -1, "fileHash")) + ft.AddFrom(fromExec).AddTo(ioswitch2.NewToNode(uploadNode.Node, -1, "fileHash")) parser := parser.NewParser(cdssdk.DefaultECRedundancy) plans := exec.NewPlanBuilder() @@ -260,44 +234,3 @@ func uploadToNode(file io.Reader, node cdssdk.Node) (string, error) { return ret["fileHash"].(string), nil } - -func uploadToLocalIPFS(file io.Reader, nodeID cdssdk.NodeID, shouldPin bool) (string, error) { - ipfsCli, err := stgglb.IPFSPool.Acquire() - if err != nil { - return "", fmt.Errorf("new ipfs client: %w", err) - } - defer ipfsCli.Close() - - // 从本地IPFS上传文件 - fileHash, err := ipfsCli.CreateFile(file) - if err != nil { - return "", fmt.Errorf("creating ipfs file: %w", err) - } - - if !shouldPin { - return fileHash, nil - } - - err = pinIPFSFile(nodeID, fileHash) - if err != nil { - return "", err - } - - return fileHash, nil -} - -func pinIPFSFile(nodeID cdssdk.NodeID, fileHash string) error { - agtCli, err := stgglb.AgentMQPool.Acquire(nodeID) - if err != nil { - return fmt.Errorf("new agent client: %w", err) - } - defer stgglb.AgentMQPool.Release(agtCli) - - // 然后让最近节点pin本地上传的文件 - _, err = agtCli.PinObject(agtmq.ReqPinObject([]string{fileHash}, false)) - if err != nil { - return fmt.Errorf("start pinning object: %w", err) - } - - return nil -} diff --git a/common/pkgs/db/storage.go b/common/pkgs/db/storage.go deleted file mode 100644 index bcf4f60..0000000 --- a/common/pkgs/db/storage.go +++ /dev/null @@ -1,75 +0,0 @@ -package db - -import ( - "database/sql" - "fmt" - - "github.com/jmoiron/sqlx" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gitlink.org.cn/cloudream/storage/common/pkgs/db/model" -) - -type StorageDB struct { - *DB -} - -func (db *DB) Storage() *StorageDB { - return &StorageDB{DB: db} -} - -func (db *StorageDB) GetByID(ctx SQLContext, stgID cdssdk.StorageID) (model.Storage, error) { - var stg model.Storage - err := sqlx.Get(ctx, &stg, "select * from Storage where StorageID = ?", stgID) - return stg, err -} - -func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) { - var ret []cdssdk.StorageID - err := sqlx.Select(ctx, &ret, "select StorageID from Storage limit ?, ?", start, count) - return ret, err -} - -func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) { - var stgID int64 - err := sqlx.Get(ctx, &stgID, - "select Storage.StorageID from Storage, UserStorage where"+ - " Storage.StorageID = ? and"+ - " Storage.StorageID = UserStorage.StorageID and"+ - " UserStorage.UserID = ?", - storageID, userID) - - if err == sql.ErrNoRows { - return false, nil - } - - if err != nil { - return false, fmt.Errorf("find storage failed, err: %w", err) - } - - return true, nil -} - -func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (model.Storage, error) { - var stg model.Storage - err := sqlx.Get(ctx, &stg, - "select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = ? and UserStorage.StorageID = Storage.StorageID", - userID, - storageID) - - return stg, err -} - -func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) { - var stg model.Storage - err := sqlx.Get(ctx, &stg, - "select Storage.* from UserStorage, Storage where UserID = ? and UserStorage.StorageID = Storage.StorageID and Storage.Name = ?", - userID, - name) - - return stg, err -} - -func (db *StorageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, state string) error { - _, err := ctx.Exec("update Storage set State = ? where StorageID = ?", state, storageID) - return err -} diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index dfae8ad..b013b2a 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -108,7 +108,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *ShardWrite) String() string { - return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input.ID, o.FileHash.ID) } type ShardReadNode struct { @@ -117,7 +117,7 @@ type ShardReadNode struct { Open types.OpenOption } -func (b *GraphNodeBuilder) NewIPFSRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ StorageID: stgID, Open: open, @@ -146,37 +146,37 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { // return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) // } -type IPFSWriteNode struct { +type ShardWriteNode struct { dag.NodeBase FileHashStoreKey string } -func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { - node := &IPFSWriteNode{ +func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNode { + node := &ShardWriteNode{ FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) return node } -func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { +func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) } -func (t *IPFSWriteNode) Input() dag.StreamSlot { +func (t *ShardWriteNode) Input() dag.StreamSlot { return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { +func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { return t.OutputValues().Get(0) } -func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { +func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ Input: t.InputStreams().Get(0).Var, FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 746dc2a..b04ce75 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -235,7 +235,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 switch f := f.(type) { case *ioswitch2.FromNode: - t := ctx.DAG.NewIPFSRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) + t := ctx.DAG.NewShardRead(f.Storage.StorageID, types.NewOpen(f.FileHash)) if f.DataIndex == -1 { t.Open.WithNullableLength(repRange.Offset, repRange.Length) @@ -281,7 +281,7 @@ func (p *DefaultParser) buildFromNode(ctx *ParseContext, f ioswitch2.From) (ops2 func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToNode, error) { switch t := t.(type) { case *ioswitch2.ToNode: - n := ctx.DAG.NewIPFSWrite(t.FileHashStoreKey) + n := ctx.DAG.NewShardWrite(t.FileHashStoreKey) n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Node}) n.Env().Pinned = true @@ -498,7 +498,7 @@ func (p *DefaultParser) dropUnused(ctx *ParseContext) { // 为IPFS写入指令存储结果 func (p *DefaultParser) storeIPFSWriteResult(ctx *ParseContext) { - dag.WalkOnlyType[*ops2.IPFSWriteNode](ctx.DAG.Graph, func(n *ops2.IPFSWriteNode) bool { + dag.WalkOnlyType[*ops2.ShardWriteNode](ctx.DAG.Graph, func(n *ops2.ShardWriteNode) bool { if n.FileHashStoreKey == "" { return true } diff --git a/common/pkgs/ioswitchlrc/ops2/shard_store.go b/common/pkgs/ioswitchlrc/ops2/shard_store.go index dfae8ad..b013b2a 100644 --- a/common/pkgs/ioswitchlrc/ops2/shard_store.go +++ b/common/pkgs/ioswitchlrc/ops2/shard_store.go @@ -108,7 +108,7 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { } func (o *ShardWrite) String() string { - return fmt.Sprintf("IPFSWrite %v -> %v", o.Input.ID, o.FileHash.ID) + return fmt.Sprintf("ShardWrite %v -> %v", o.Input.ID, o.FileHash.ID) } type ShardReadNode struct { @@ -117,7 +117,7 @@ type ShardReadNode struct { Open types.OpenOption } -func (b *GraphNodeBuilder) NewIPFSRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { +func (b *GraphNodeBuilder) NewShardRead(stgID cdssdk.StorageID, open types.OpenOption) *ShardReadNode { node := &ShardReadNode{ StorageID: stgID, Open: open, @@ -146,37 +146,37 @@ func (t *ShardReadNode) GenerateOp() (exec.Op, error) { // return fmt.Sprintf("IPFSRead[%s,%v+%v]%v%v", t.FileHash, t.Option.Offset, t.Option.Length, formatStreamIO(node), formatValueIO(node)) // } -type IPFSWriteNode struct { +type ShardWriteNode struct { dag.NodeBase FileHashStoreKey string } -func (b *GraphNodeBuilder) NewIPFSWrite(fileHashStoreKey string) *IPFSWriteNode { - node := &IPFSWriteNode{ +func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNode { + node := &ShardWriteNode{ FileHashStoreKey: fileHashStoreKey, } b.AddNode(node) return node } -func (t *IPFSWriteNode) SetInput(input *dag.StreamVar) { +func (t *ShardWriteNode) SetInput(input *dag.StreamVar) { t.InputStreams().EnsureSize(1) input.Connect(t, 0) t.OutputValues().SetupNew(t, t.Graph().NewValueVar(dag.StringValueVar)) } -func (t *IPFSWriteNode) Input() dag.StreamSlot { +func (t *ShardWriteNode) Input() dag.StreamSlot { return dag.StreamSlot{ Var: t.InputStreams().Get(0), Index: 0, } } -func (t *IPFSWriteNode) FileHashVar() *dag.ValueVar { +func (t *ShardWriteNode) FileHashVar() *dag.ValueVar { return t.OutputValues().Get(0) } -func (t *IPFSWriteNode) GenerateOp() (exec.Op, error) { +func (t *ShardWriteNode) GenerateOp() (exec.Op, error) { return &ShardWrite{ Input: t.InputStreams().Get(0).Var, FileHash: t.OutputValues().Get(0).Var.(*exec.StringVar), diff --git a/common/pkgs/mq/agent/agent.go b/common/pkgs/mq/agent/agent.go index 287d9b6..3109663 100644 --- a/common/pkgs/mq/agent/agent.go +++ b/common/pkgs/mq/agent/agent.go @@ -16,16 +16,13 @@ type GetState struct { } type GetStateResp struct { mq.MessageBodyBase - IPFSState string `json:"ipfsState"` } func NewGetState() *GetState { return &GetState{} } -func NewGetStateResp(ipfsState string) *GetStateResp { - return &GetStateResp{ - IPFSState: ipfsState, - } +func NewGetStateResp() *GetStateResp { + return &GetStateResp{} } func (client *Client) GetState(msg *GetState, opts ...mq.RequestOption) (*GetStateResp, error) { return mq.Request(Service.GetState, client.rabbitCli, msg, opts...) diff --git a/common/pkgs/mq/coordinator/agent.go b/common/pkgs/mq/coordinator/agent.go index 1de6f52..bd97423 100644 --- a/common/pkgs/mq/coordinator/agent.go +++ b/common/pkgs/mq/coordinator/agent.go @@ -35,16 +35,14 @@ type AgentStatusReport struct { NodeID int64 `json:"nodeID"` NodeDelayIDs []int64 `json:"nodeDelayIDs"` NodeDelays []int `json:"nodeDelays"` - IPFSStatus string `json:"ipfsStatus"` LocalDirStatus string `json:"localDirStatus"` } -func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []int, ipfsStatus string, localDirStatus string) *AgentStatusReport { +func NewAgentStatusReportBody(nodeID int64, nodeDelayIDs []int64, nodeDelays []int, localDirStatus string) *AgentStatusReport { return &AgentStatusReport{ NodeID: nodeID, NodeDelayIDs: nodeDelayIDs, NodeDelays: nodeDelays, - IPFSStatus: ipfsStatus, LocalDirStatus: localDirStatus, } } diff --git a/scanner/internal/event/agent_check_state.go b/scanner/internal/event/agent_check_state.go index 66dfb70..12818a5 100644 --- a/scanner/internal/event/agent_check_state.go +++ b/scanner/internal/event/agent_check_state.go @@ -54,7 +54,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } defer stgglb.AgentMQPool.Release(agtCli) - getResp, err := agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) + _, err = agtCli.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) @@ -69,17 +69,6 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } - // 根据返回结果修改节点状态 - if getResp.IPFSState != consts.IPFSStateOK { - log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.IPFSState) - - err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NodeStateUnavailable) - if err != nil { - log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) - } - return - } - // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal err = execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NodeStateNormal) if err != nil {