From f991338a865b338f8fda99556a0a3fe3dc2c3cda Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 22 May 2023 10:07:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0Agent=E7=9A=84=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E7=8A=B6=E6=80=81=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/task/check_state.go | 27 +++++++++ internal/task/executor.go | 105 +++++++++++++++++++++++++++++++++++ internal/task/task.go | 6 ++ 3 files changed, 138 insertions(+) create mode 100644 internal/task/check_state.go create mode 100644 internal/task/executor.go create mode 100644 internal/task/task.go diff --git a/internal/task/check_state.go b/internal/task/check_state.go new file mode 100644 index 0000000..7bb12ed --- /dev/null +++ b/internal/task/check_state.go @@ -0,0 +1,27 @@ +package task + +import ( + "gitlink.org.cn/cloudream/agent/internal/config" + "gitlink.org.cn/cloudream/common/consts" + scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner" + sctsk "gitlink.org.cn/cloudream/rabbitmq/message/scanner/task" +) + +type CheckStateTask struct { +} + +func (t *CheckStateTask) TryMerge(other Task) bool { + _, ok := other.(*CheckStateTask) + return ok +} + +func (t *CheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { + ipfsStatus := consts.IPFS_STATUS_OK + + if execCtx.IPFS.IsUp() { + ipfsStatus = consts.IPFS_STATUS_OK + } + + // 紧急任务 + execCtx.Scanner.PostTask(scmsg.NewPostTaskBody(sctsk.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true)) +} diff --git a/internal/task/executor.go b/internal/task/executor.go new file mode 100644 index 0000000..906d546 --- /dev/null +++ b/internal/task/executor.go @@ -0,0 +1,105 @@ +package task + +import ( + "context" + "sync" + + "github.com/zyedidia/generic/list" + mydb "gitlink.org.cn/cloudream/db" + sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner" + "gitlink.org.cn/cloudream/utils/ipfs" + "golang.org/x/sync/semaphore" +) + +type ExecuteOption struct { + IsEmergency bool + DontMerge bool +} +type ExecuteContext struct { + Executor *Executor + Scanner *sccli.ScannerClient + DB *mydb.DB + IPFS *ipfs.IPFS +} +type postedTask struct { + Task Task + Option ExecuteOption +} + +type Executor struct { + tasks list.List[postedTask] + locker sync.Mutex + taskSema semaphore.Weighted + execCtx *ExecuteContext +} + +func (e *Executor) Post(task Task, opts ...ExecuteOption) { + opt := ExecuteOption{ + IsEmergency: false, + DontMerge: false, + } + + if len(opts) > 0 { + opt = opts[0] + } + + e.locker.Lock() + defer e.locker.Unlock() + + // 紧急任务直接插入到队头,不进行合并 + if opt.IsEmergency { + e.tasks.PushFront(postedTask{ + Task: task, + Option: opt, + }) + e.taskSema.Release(1) + return + } + + // 合并任务 + if opt.DontMerge { + ptr := e.tasks.Front + for ptr != nil { + // 只与非紧急任务,且允许合并的任务进行合并 + if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge { + if ptr.Value.Task.TryMerge(task) { + return + } + } + + ptr = ptr.Next + } + } + + e.tasks.PushBack(postedTask{ + Task: task, + Option: opt, + }) + e.taskSema.Release(1) +} + +// Execute 开始执行任务 +func (e *Executor) Execute() error { + for { + // TODO 打印错误日志 + e.taskSema.Acquire(context.Background(), 1) + + task := e.popFrontTask() + if task == nil { + continue + } + + task.Task.Execute(e.execCtx, task.Option) + } +} + +func (e *Executor) popFrontTask() *postedTask { + e.locker.Lock() + defer e.locker.Unlock() + + if e.tasks.Front == nil { + return nil + } + + return &e.tasks.Front.Value +} diff --git a/internal/task/task.go b/internal/task/task.go new file mode 100644 index 0000000..40547df --- /dev/null +++ b/internal/task/task.go @@ -0,0 +1,6 @@ +package task + +type Task interface { + TryMerge(other Task) bool // 尝试将other任务与自身合并,如果成功返回true + Execute(ctx *ExecuteContext, myOpts ExecuteOption) +}