Browse Source

Task改名成Event

gitlink
Sydonian 2 years ago
parent
commit
c52a111441
15 changed files with 205 additions and 296 deletions
  1. +16
    -16
      internal/event/agent_check_cache.go
  2. +14
    -14
      internal/event/agent_check_state.go
  3. +17
    -17
      internal/event/agent_check_storage.go
  4. +36
    -0
      internal/event/check_object.go
  5. +11
    -11
      internal/event/check_rep_count.go
  6. +1
    -1
      internal/event/check_rep_count_test.go
  7. +10
    -10
      internal/event/check_unavailable_cache.go
  8. +16
    -0
      internal/event/task.go
  9. +6
    -6
      internal/event/update_agent_state.go
  10. +60
    -0
      internal/event/update_cache.go
  11. +18
    -18
      internal/event/update_storage.go
  12. +0
    -36
      internal/task/check_object.go
  13. +0
    -101
      internal/task/executor.go
  14. +0
    -6
      internal/task/task.go
  15. +0
    -60
      internal/task/update_cache.go

internal/task/agent_check_cache.go → internal/event/agent_check_cache.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"database/sql"
@@ -11,38 +11,38 @@ import (

agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
)

type AgentCheckCacheTask struct {
type AgentCheckCache struct {
NodeID int
FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查
}

func NewAgentCheckCacheTask(nodeID int, fileHashes []string) *AgentCheckCacheTask {
return &AgentCheckCacheTask{
func NewAgentCheckCache(nodeID int, fileHashes []string) *AgentCheckCache {
return &AgentCheckCache{
NodeID: nodeID,
FileHashes: fileHashes,
}
}

func (t *AgentCheckCacheTask) TryMerge(other Task) bool {
task, ok := other.(*AgentCheckCacheTask)
func (t *AgentCheckCache) TryMerge(other Event) bool {
event, ok := other.(*AgentCheckCache)
if !ok {
return false
}

// FileHashes为nil时代表全量检查
if task.FileHashes == nil {
if event.FileHashes == nil {
t.FileHashes = nil
} else if t.FileHashes != nil {
t.FileHashes = lo.Union(t.FileHashes, task.FileHashes)
t.FileHashes = lo.Union(t.FileHashes, event.FileHashes)
}

return true
}

func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
var isComplete bool
var caches []model.Cache

@@ -50,7 +50,7 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO

if t.FileHashes == nil {
var err error
caches, err = mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), t.NodeID)
caches, err = mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
if err != nil {
logger.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
return
@@ -59,7 +59,7 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO

} else {
for _, hash := range t.FileHashes {
ch, err := mysql.Cache.Get(execCtx.DB.SQLCtx(), hash, t.NodeID)
ch, err := mysql.Cache.Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID)
// 记录不存在则跳过
if err == sql.ErrNoRows {
continue
@@ -83,10 +83,10 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO
}
defer agentClient.Close()

err = agentClient.PostTask(agtmsg.NewPostTaskBody(
agttsk.NewCheckCacheTask(isComplete, caches),
execOpts.IsEmergency, // 继承本任务的执行选项
execOpts.DontMerge))
err = agentClient.PostEvent(agtmsg.NewPostEventBody(
agttsk.NewCheckCache(isComplete, caches),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge))
if err != nil {
logger.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error())
return

internal/task/agent_check_state.go → internal/event/agent_check_state.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"database/sql"
@@ -11,33 +11,33 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
"gitlink.org.cn/cloudream/scanner/internal/config"
)

type AgentCheckStateTask struct {
type AgentCheckState struct {
NodeIDs []int
}

func NewAgentCheckStateTask(nodeIDs []int) AgentCheckStateTask {
return AgentCheckStateTask{
func NewAgentCheckState(nodeIDs []int) AgentCheckState {
return AgentCheckState{
NodeIDs: nodeIDs,
}
}

func (t *AgentCheckStateTask) TryMerge(other Task) bool {
task, ok := other.(*AgentCheckStateTask)
func (t *AgentCheckState) TryMerge(other Event) bool {
event, ok := other.(*AgentCheckState)
if !ok {
return false
}

t.NodeIDs = lo.Union(t.NodeIDs, task.NodeIDs)
t.NodeIDs = lo.Union(t.NodeIDs, event.NodeIDs)
return true
}

func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
for _, nodeID := range t.NodeIDs {
node, err := mysql.Node.GetByID(execCtx.DB.SQLCtx(), nodeID)
node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), nodeID)
if err == sql.ErrNoRows {
continue
}
@@ -53,20 +53,20 @@ func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO

// 检查上次上报时间,超时的设置为不可用
if time.Since(node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second {
err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), nodeID, consts.NODE_STATE_UNAVAILABLE)
err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), nodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
logger.WithField("NodeID", nodeID).Warnf("set node state failed, err: %s", err.Error())
continue
}

caches, err := mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), nodeID)
caches, err := mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), nodeID)
if err != nil {
logger.WithField("NodeID", nodeID).Warnf("get node caches failed, err: %s", err.Error())
continue
}

// 补充备份数
execCtx.Executor.Post(NewCheckRepCountTask(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue })))
execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue })))
continue
}

@@ -78,7 +78,7 @@ func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO
defer agentClient.Close()

// 紧急任务
err = agentClient.PostTask(agtmsg.NewPostTaskBody(agttsk.NewCheckStateTask(), true, true))
err = agentClient.PostEvent(agtmsg.NewPostEventBody(agttsk.NewCheckState(), true, true))
if err != nil {
logger.WithField("NodeID", nodeID).Warnf("request to agent failed, err: %s", err.Error())
}

internal/task/agent_check_storage.go → internal/event/agent_check_storage.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"database/sql"
@@ -10,37 +10,37 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task"
agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
"gitlink.org.cn/cloudream/scanner/internal/config"
)

type AgentCheckStorageTask struct {
type AgentCheckStorage struct {
StorageID int
ObjectIDs []int // 需要检查的Object文件列表,如果为nil(不是为空),则代表进行全量检查
}

func (t *AgentCheckStorageTask) TryMerge(other Task) bool {
task, ok := other.(*AgentCheckStorageTask)
func (t *AgentCheckStorage) TryMerge(other Event) bool {
event, ok := other.(*AgentCheckStorage)
if !ok {
return false
}

if t.StorageID != task.StorageID {
if t.StorageID != event.StorageID {
return false
}

// ObjectIDs为nil时代表全量检查
if task.ObjectIDs == nil {
if event.ObjectIDs == nil {
t.ObjectIDs = nil
} else if t.ObjectIDs != nil {
t.ObjectIDs = lo.Union(t.ObjectIDs, task.ObjectIDs)
t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs)
}

return true
}

func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
stg, err := mysql.Storage.GetByID(execCtx.DB.SQLCtx(), t.StorageID)
func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
if err != nil {
if err != sql.ErrNoRows {
logger.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error())
@@ -48,7 +48,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut
return
}

node, err := mysql.Node.GetByID(execCtx.DB.SQLCtx(), stg.NodeID)
node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
if err != nil {
if err != sql.ErrNoRows {
logger.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error())
@@ -66,7 +66,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut
var objects []model.StorageObject
if t.ObjectIDs == nil {
var err error
objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.DB.SQLCtx(), t.StorageID)
objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
if err != nil {
logger.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error())
return
@@ -74,7 +74,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut
isComplete = true
} else {
for _, objID := range t.ObjectIDs {
obj, err := mysql.StorageObject.Get(execCtx.DB.SQLCtx(), t.StorageID, objID)
obj, err := mysql.StorageObject.Get(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
if err == sql.ErrNoRows {
continue
}
@@ -98,10 +98,10 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut
}
defer agentClient.Close()

err = agentClient.PostTask(agtmsg.NewPostTaskBody(
agttsk.NewCheckStorageTask(stg.Directory, isComplete, objects),
execOpts.IsEmergency, // 继承本任务的执行选项
execOpts.DontMerge))
err = agentClient.PostEvent(agtmsg.NewPostEventBody(
agttsk.NewCheckStorage(stg.Directory, isComplete, objects),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge))
if err != nil {
logger.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", stg.NodeID, err.Error())
}

+ 36
- 0
internal/event/check_object.go View File

@@ -0,0 +1,36 @@
package event

import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
)

type CheckObject struct {
ObjectIDs []int
}

func NewCheckObject(objIDs []int) CheckObject {
return CheckObject{
ObjectIDs: objIDs,
}
}

func (t *CheckObject) TryMerge(other Event) bool {
event, ok := other.(*CheckObject)
if !ok {
return false
}

t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs)
return true
}

func (t *CheckObject) Execute(execCtx ExecuteContext) {
for _, objID := range t.ObjectIDs {
err := mysql.Object.DeleteUnused(execCtx.Args.DB.SQLCtx(), objID)
if err != nil {
logger.WithField("ObjectID", objID).Warnf("delete unused object failed, err: %s", err.Error())
}
}
}

internal/task/check_rep_count.go → internal/event/check_rep_count.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"database/sql"
@@ -17,27 +17,27 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
)

type CheckRepCountTask struct {
type CheckRepCount struct {
FileHashes []string
}

func NewCheckRepCountTask(fileHashes []string) *CheckRepCountTask {
return &CheckRepCountTask{
func NewCheckRepCount(fileHashes []string) *CheckRepCount {
return &CheckRepCount{
FileHashes: fileHashes,
}
}

func (t *CheckRepCountTask) TryMerge(other Task) bool {
task, ok := other.(*CheckRepCountTask)
func (t *CheckRepCount) TryMerge(other Event) bool {
event, ok := other.(*CheckRepCount)
if !ok {
return false
}

t.FileHashes = lo.Union(t.FileHashes, task.FileHashes)
t.FileHashes = lo.Union(t.FileHashes, event.FileHashes)
return true
}

func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) {
func (t *CheckRepCount) Execute(execCtx ExecuteContext) {
updatedNodeAndHashes := make(map[int][]string)

for _, fileHash := range t.FileHashes {
@@ -55,13 +55,13 @@ func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOptio

for nodeID, hashes := range updatedNodeAndHashes {
// 新任务继承本任务的执行设定(紧急任务依然保持紧急任务)
execCtx.Executor.Post(NewAgentCheckCacheTask(nodeID, hashes), myOpts)
execCtx.Executor.Post(NewAgentCheckCache(nodeID, hashes), execCtx.Option)
}
}

func (t *CheckRepCountTask) checkOneRepCount(fileHash string, execCtx *ExecuteContext) ([]int, error) {
func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext) ([]int, error) {
var updatedNodeIDs []int
err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
err := execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
repMaxCnt, err := mysql.ObjectRep.GetFileMaxRepCount(tx, fileHash)
if err != nil {
return fmt.Errorf("get file max rep count failed, err: %w", err)

internal/task/check_rep_count_test.go → internal/event/check_rep_count_test.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"testing"

internal/task/check_unavailable_cache.go → internal/event/check_unavailable_cache.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"database/sql"
@@ -12,30 +12,30 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
)

type CheckUnavailableCacheTask struct {
type CheckUnavailableCache struct {
NodeID int
}

func NewCheckUnavailableCacheTask(nodeID int) CheckUnavailableCacheTask {
return CheckUnavailableCacheTask{
func NewCheckUnavailableCache(nodeID int) CheckUnavailableCache {
return CheckUnavailableCache{
NodeID: nodeID,
}
}

func (t *CheckUnavailableCacheTask) TryMerge(other Task) bool {
task, ok := other.(*CheckUnavailableCacheTask)
func (t *CheckUnavailableCache) TryMerge(other Event) bool {
event, ok := other.(*CheckUnavailableCache)
if !ok {
return false
}
if task.NodeID != t.NodeID {
if event.NodeID != t.NodeID {
return false
}

return true
}

func (t *CheckUnavailableCacheTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) {
err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
func (t *CheckUnavailableCache) Execute(execCtx ExecuteContext) {
err := execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error {
node, err := mysql.Node.GetByID(tx, t.NodeID)
if err == sql.ErrNoRows {
return nil
@@ -58,7 +58,7 @@ func (t *CheckUnavailableCacheTask) Execute(execCtx *ExecuteContext, myOpts Exec
return fmt.Errorf("delete node all caches failed, err: %w", err)
}

execCtx.Executor.Post(NewCheckRepCountTask(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue })))
execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue })))
return nil
})


+ 16
- 0
internal/event/task.go View File

@@ -0,0 +1,16 @@
package event

import (
event "gitlink.org.cn/cloudream/common/pkg/event"
mydb "gitlink.org.cn/cloudream/db"
)

type ExecuteArgs struct {
DB *mydb.DB
}

type Executor = event.Executor[ExecuteArgs]

type ExecuteContext = event.ExecuteContext[ExecuteArgs]

type Event = event.Event[ExecuteArgs]

internal/task/update_agent_state.go → internal/event/update_agent_state.go View File

@@ -1,4 +1,4 @@
package task
package event

import (
"gitlink.org.cn/cloudream/common/consts"
@@ -6,20 +6,20 @@ import (
mysql "gitlink.org.cn/cloudream/db/sql"
)

type UpdateAgentStateTask struct {
type UpdateAgentState struct {
NodeID int
IPFSStatus string
}

func (t *UpdateAgentStateTask) TryMerge(other Task) bool {
func (t *UpdateAgentState) TryMerge(other Event) bool {
return false
}

func (t *UpdateAgentStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
func (t *UpdateAgentState) Execute(execCtx ExecuteContext) {
if t.IPFSStatus != consts.IPFS_STATUS_OK {
logger.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSStatus)

err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
logger.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}
@@ -27,7 +27,7 @@ func (t *UpdateAgentStateTask) Execute(execCtx *ExecuteContext, execOpts Execute
}

// TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal
err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
if err != nil {
logger.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}

+ 60
- 0
internal/event/update_cache.go View File

@@ -0,0 +1,60 @@
package event

import (
tskcst "gitlink.org.cn/cloudream/common/consts/event"
"gitlink.org.cn/cloudream/common/utils/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
)

type UpdateCacheEntry struct {
FileHash string
Operation string
}

func NewUpdateCacheEntry(fileHash string, op string) UpdateCacheEntry {
return UpdateCacheEntry{
FileHash: fileHash,
Operation: op,
}
}

type UpdateCache struct {
NodeID int
Entries []UpdateCacheEntry
}

func NewUpdateCache(nodeID int, entries []UpdateCacheEntry) UpdateCache {
return UpdateCache{
NodeID: nodeID,
Entries: entries,
}
}

func (t *UpdateCache) TryMerge(other Event) bool {
event, ok := other.(*UpdateCache)
if !ok {
return false
}
if event.NodeID != t.NodeID {
return false
}

// TODO 可以考虑合并同FileHash和NodeID的记录
t.Entries = append(t.Entries, event.Entries...)
return true
}

func (t *UpdateCache) Execute(execCtx ExecuteContext) {
for _, entry := range t.Entries {
switch entry.Operation {
case tskcst.UPDATE_CACHE_OP_UNTEMP:
err := mysql.Cache.DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)

if err != nil {
logger.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("delete temp cache failed, err: %s", err.Error())
}
}
}
}

internal/task/update_storage.go → internal/event/update_storage.go View File

@@ -1,55 +1,55 @@
package task
package event

import (
tskcst "gitlink.org.cn/cloudream/common/consts/task"
tskcst "gitlink.org.cn/cloudream/common/consts/event"
"gitlink.org.cn/cloudream/common/utils/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
)

type UpdateStorageTaskEntry struct {
type UpdateStorageEntry struct {
ObjectID int
Operation string
}

func NewUpdateStorageTaskEntry(objectID int, op string) UpdateStorageTaskEntry {
return UpdateStorageTaskEntry{
func NewUpdateStorageEntry(objectID int, op string) UpdateStorageEntry {
return UpdateStorageEntry{
ObjectID: objectID,
Operation: op,
}
}

type UpdateStorageTask struct {
type UpdateStorage struct {
StorageID int
DirectoryStatus string
Entries []UpdateStorageTaskEntry
Entries []UpdateStorageEntry
}

func NewUpdateStorageTask(dirStatus string, entries []UpdateStorageTaskEntry) UpdateStorageTask {
return UpdateStorageTask{
func NewUpdateStorage(dirStatus string, entries []UpdateStorageEntry) UpdateStorage {
return UpdateStorage{
DirectoryStatus: dirStatus,
Entries: entries,
}
}

func (t *UpdateStorageTask) TryMerge(other Task) bool {
task, ok := other.(*UpdateStorageTask)
func (t *UpdateStorage) TryMerge(other Event) bool {
event, ok := other.(*UpdateStorage)
if !ok {
return false
}
if task.StorageID != t.StorageID {
if event.StorageID != t.StorageID {
return false
}

// 后投递的任务的状态更新一些
t.DirectoryStatus = task.DirectoryStatus
t.DirectoryStatus = event.DirectoryStatus
// TODO 可以考虑合并同FileHash和NodeID的记录
t.Entries = append(t.Entries, task.Entries...)
t.Entries = append(t.Entries, event.Entries...)
return true
}

func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
func (t *UpdateStorage) Execute(execCtx ExecuteContext) {

err := mysql.Storage.ChangeState(execCtx.DB.SQLCtx(), t.StorageID, t.DirectoryStatus)
err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryStatus)
if err != nil {
logger.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error())
}
@@ -57,7 +57,7 @@ func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOpt
for _, entry := range t.Entries {
switch entry.Operation {
case tskcst.UPDATE_STORAGE_DELETE:
err := mysql.StorageObject.Delete(execCtx.DB.SQLCtx(), t.StorageID, entry.ObjectID)
err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID)
if err != nil {
logger.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
@@ -65,7 +65,7 @@ func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOpt
}

case tskcst.UPDATE_STORAGE_SET_NORMAL:
err := mysql.StorageObject.SetStateNormal(execCtx.DB.SQLCtx(), t.StorageID, entry.ObjectID)
err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID)
if err != nil {
logger.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).

+ 0
- 36
internal/task/check_object.go View File

@@ -1,36 +0,0 @@
package task

import (
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/utils/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
)

type CheckObjectTask struct {
ObjectIDs []int
}

func NewCheckObjectTask(objIDs []int) CheckObjectTask {
return CheckObjectTask{
ObjectIDs: objIDs,
}
}

func (t *CheckObjectTask) TryMerge(other Task) bool {
task, ok := other.(*CheckObjectTask)
if !ok {
return false
}

t.ObjectIDs = lo.Union(t.ObjectIDs, task.ObjectIDs)
return true
}

func (t *CheckObjectTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
for _, objID := range t.ObjectIDs {
err := mysql.Object.DeleteUnused(execCtx.DB.SQLCtx(), objID)
if err != nil {
logger.WithField("ObjectID", objID).Warnf("delete unused object failed, err: %s", err.Error())
}
}
}

+ 0
- 101
internal/task/executor.go View File

@@ -1,101 +0,0 @@
package task

import (
"context"
"sync"

"github.com/zyedidia/generic/list"
mydb "gitlink.org.cn/cloudream/db"
"golang.org/x/sync/semaphore"
)

type ExecuteOption struct {
IsEmergency bool
DontMerge bool
}
type ExecuteContext struct {
Executor *Executor
DB *mydb.DB
}
type postedTask struct {
Task Task
Option ExecuteOption
}

type Executor struct {
tasks list.List[postedTask]
locker sync.Mutex
taskSema semaphore.Weighted
execCtx *ExecuteContext
}

func (e *Executor) Post(task Task, opts ...ExecuteOption) {
opt := ExecuteOption{
IsEmergency: false,
DontMerge: false,
}

if len(opts) > 0 {
opt = opts[0]
}

e.locker.Lock()
defer e.locker.Unlock()

// 紧急任务直接插入到队头,不进行合并
if opt.IsEmergency {
e.tasks.PushFront(postedTask{
Task: task,
Option: opt,
})
e.taskSema.Release(1)
return
}

// 合并任务
if opt.DontMerge {
ptr := e.tasks.Front
for ptr != nil {
// 只与非紧急任务,且允许合并的任务进行合并
if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge {
if ptr.Value.Task.TryMerge(task) {
return
}
}

ptr = ptr.Next
}
}

e.tasks.PushBack(postedTask{
Task: task,
Option: opt,
})
e.taskSema.Release(1)
}

// Execute 开始执行任务
func (e *Executor) Execute() error {
for {
// TODO 打印错误日志
e.taskSema.Acquire(context.Background(), 1)

task := e.popFrontTask()
if task == nil {
continue
}

task.Task.Execute(e.execCtx, task.Option)
}
}

func (e *Executor) popFrontTask() *postedTask {
e.locker.Lock()
defer e.locker.Unlock()

if e.tasks.Front == nil {
return nil
}

return &e.tasks.Front.Value
}

+ 0
- 6
internal/task/task.go View File

@@ -1,6 +0,0 @@
package task

type Task interface {
TryMerge(other Task) bool // 尝试将other任务与自身合并,如果成功返回true
Execute(ctx *ExecuteContext, myOpts ExecuteOption)
}

+ 0
- 60
internal/task/update_cache.go View File

@@ -1,60 +0,0 @@
package task

import (
tskcst "gitlink.org.cn/cloudream/common/consts/task"
"gitlink.org.cn/cloudream/common/utils/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
)

type UpdateCacheTaskEntry struct {
FileHash string
Operation string
}

func NewUpdateCacheTaskEntry(fileHash string, op string) UpdateCacheTaskEntry {
return UpdateCacheTaskEntry{
FileHash: fileHash,
Operation: op,
}
}

type UpdateCacheTask struct {
NodeID int
Entries []UpdateCacheTaskEntry
}

func NewUpdateCacheTask(nodeID int, entries []UpdateCacheTaskEntry) UpdateCacheTask {
return UpdateCacheTask{
NodeID: nodeID,
Entries: entries,
}
}

func (t *UpdateCacheTask) TryMerge(other Task) bool {
task, ok := other.(*UpdateCacheTask)
if !ok {
return false
}
if task.NodeID != t.NodeID {
return false
}

// TODO 可以考虑合并同FileHash和NodeID的记录
t.Entries = append(t.Entries, task.Entries...)
return true
}

func (t *UpdateCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) {
for _, entry := range t.Entries {
switch entry.Operation {
case tskcst.UPDATE_CACHE_OP_UNTEMP:
err := mysql.Cache.DeleteTemp(execCtx.DB.SQLCtx(), entry.FileHash, t.NodeID)

if err != nil {
logger.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("delete temp cache failed, err: %s", err.Error())
}
}
}
}

Loading…
Cancel
Save