Browse Source

scanner和agent启动EventExecutor

gitlink
Sydonian 2 years ago
parent
commit
9e88621ef0
6 changed files with 126 additions and 9 deletions
  1. +2
    -0
      internal/config/config.go
  2. +11
    -2
      internal/event/check_state.go
  3. +29
    -0
      internal/event/event.go
  4. +28
    -0
      internal/services/cmd/event.go
  5. +9
    -4
      internal/services/cmd/service.go
  6. +47
    -3
      main.go

+ 2
- 0
internal/config/config.go View File

@@ -4,6 +4,7 @@ import (
c "gitlink.org.cn/cloudream/common/utils/config"
"gitlink.org.cn/cloudream/common/utils/ipfs"
log "gitlink.org.cn/cloudream/common/utils/logger"
dbcfg "gitlink.org.cn/cloudream/db/config"
racfg "gitlink.org.cn/cloudream/rabbitmq/config"
)

@@ -15,6 +16,7 @@ type Config struct {
Logger log.Config `json:"logger"`
RabbitMQ racfg.Config `json:"rabbitMQ"`
IPFS ipfs.Config `json:"ipfs"`
DB dbcfg.Config `json:"db"`
}

var cfg Config


+ 11
- 2
internal/event/check_state.go View File

@@ -3,13 +3,18 @@ package event
import (
"gitlink.org.cn/cloudream/agent/internal/config"
"gitlink.org.cn/cloudream/common/consts"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
scmsg "gitlink.org.cn/cloudream/rabbitmq/message/scanner"
sctsk "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
)

type CheckState struct {
}

func NewCheckState() *CheckState {
return &CheckState{}
}

func (t *CheckState) TryMerge(other Event) bool {
_, ok := other.(*CheckState)
return ok
@@ -23,5 +28,9 @@ func (t *CheckState) Execute(execCtx ExecuteContext) {
}

// 紧急任务
execCtx.Args.Scanner.PostEvent(scmsg.NewPostEventBody(sctsk.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true))
execCtx.Args.Scanner.PostEvent(scmsg.NewPostEventBody(scevt.NewUpdateAgentState(config.Cfg().ID, ipfsStatus), true, true))
}

func init() {
Register(func(val agtevt.CheckState) Event { return NewCheckState() })
}

+ 29
- 0
internal/event/event.go View File

@@ -1,7 +1,11 @@
package event

import (
"fmt"
"reflect"

event "gitlink.org.cn/cloudream/common/pkg/event"
"gitlink.org.cn/cloudream/common/pkg/typedispatcher"
"gitlink.org.cn/cloudream/common/utils/ipfs"
mydb "gitlink.org.cn/cloudream/db"
sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner"
@@ -18,3 +22,28 @@ type Executor = event.Executor[ExecuteArgs]
type ExecuteContext = event.ExecuteContext[ExecuteArgs]

type Event = event.Event[ExecuteArgs]

type ExecuteOption = event.ExecuteOption

func NewExecutor(scanner *sccli.ScannerClient, db *mydb.DB, ipfs *ipfs.IPFS) Executor {
return event.NewExecutor(ExecuteArgs{
Scanner: scanner,
DB: db,
IPFS: ipfs,
})
}

var msgDispatcher typedispatcher.TypeDispatcher[Event]

func FromMessage(msg any) (Event, error) {
event, ok := msgDispatcher.Dispatch(msg)
if !ok {
return nil, fmt.Errorf("unknow event message type: %s", reflect.TypeOf(msg).Name())
}

return event, nil
}

func Register[T any](converter func(msg T) Event) {
typedispatcher.Add(msgDispatcher, converter)
}

+ 28
- 0
internal/services/cmd/event.go View File

@@ -0,0 +1,28 @@
package cmd

import (
"gitlink.org.cn/cloudream/agent/internal/event"
"gitlink.org.cn/cloudream/common/utils/logger"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
)

func (svc *Service) PostEvent(msg *agtmsg.PostEvent) {

evtMsg, err := agtevt.MapToMessage(msg.Body.Event.(map[string]any))
if err != nil {
logger.Warnf("convert map to event message failed, err: %s", err.Error())
return
}

evt, err := event.FromMessage(evtMsg)
if err != nil {
logger.Warnf("create event from event message failed, err: %s", err.Error())
return
}

svc.eventExecutor.Post(evt, event.ExecuteOption{
IsEmergency: msg.Body.IsEmergency,
DontMerge: msg.Body.DontMerge,
})
}

+ 9
- 4
internal/services/cmd/service.go View File

@@ -1,13 +1,18 @@
package cmd

import "gitlink.org.cn/cloudream/common/utils/ipfs"
import (
"gitlink.org.cn/cloudream/agent/internal/event"
"gitlink.org.cn/cloudream/common/utils/ipfs"
)

type Service struct {
ipfs *ipfs.IPFS
ipfs *ipfs.IPFS
eventExecutor *event.Executor
}

func NewService(ipfs *ipfs.IPFS) *Service {
func NewService(ipfs *ipfs.IPFS, eventExecutor *event.Executor) *Service {
return &Service{
ipfs: ipfs,
ipfs: ipfs,
eventExecutor: eventExecutor,
}
}

+ 47
- 3
main.go View File

@@ -7,8 +7,10 @@ import (
"sync"

"gitlink.org.cn/cloudream/agent/internal/config"
"gitlink.org.cn/cloudream/agent/internal/event"
"gitlink.org.cn/cloudream/common/utils/ipfs"
log "gitlink.org.cn/cloudream/common/utils/logger"
"gitlink.org.cn/cloudream/db"
agentserver "gitlink.org.cn/cloudream/proto"

"google.golang.org/grpc"
@@ -17,6 +19,7 @@ import (

cmdsvc "gitlink.org.cn/cloudream/agent/internal/services/cmd"
grpcsvc "gitlink.org.cn/cloudream/agent/internal/services/grpc"
sccli "gitlink.org.cn/cloudream/rabbitmq/client/scanner"
)

// TODO 此数据是否在运行时会发生变化?
@@ -38,6 +41,16 @@ func main() {
os.Exit(1)
}

scanner, err := sccli.NewScannerClient(&config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new scanner client failed, err: %s", err.Error())
}

db, err := db.NewDB(&config.Cfg().DB)
if err != nil {
log.Fatalf("new db failed, err: %s", err.Error())
}

ipfs, err := ipfs.NewIPFS(&config.Cfg().IPFS)
if err != nil {
log.Fatalf("new ipfs failed, err: %s", err.Error())
@@ -45,11 +58,14 @@ func main() {

//处置协调端、客户端命令(可多建几个)
wg := sync.WaitGroup{}
wg.Add(2)
wg.Add(4)

eventExecutor := event.NewExecutor(scanner, db, ipfs)
go serveEventExecutor(&eventExecutor, &wg)

// 启动命令服务器
// TODO 需要设计AgentID持久化机制
agtSvr, err := rasvr.NewAgentServer(cmdsvc.NewService(ipfs), config.Cfg().ID, &config.Cfg().RabbitMQ)
agtSvr, err := rasvr.NewAgentServer(cmdsvc.NewService(ipfs, &eventExecutor), config.Cfg().ID, &config.Cfg().RabbitMQ)
if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error())
}
@@ -69,7 +85,7 @@ func main() {

s := grpc.NewServer()
agentserver.RegisterFileTransportServer(s, grpcsvc.NewService(ipfs))
s.Serve(lis)
go serveGRPC(s, lis, &wg)

wg.Wait()
}
@@ -87,3 +103,31 @@ func serveAgentServer(server *rasvr.AgentServer, wg *sync.WaitGroup) {

wg.Done()
}

func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {
log.Info("start serving event executor")

err := executor.Execute()

if err != nil {
log.Errorf("event executor stopped with error: %s", err.Error())
}

log.Info("event executor stopped")

wg.Done()
}

func serveGRPC(s *grpc.Server, lis net.Listener, wg *sync.WaitGroup) {
log.Info("start serving grpc")

err := s.Serve(lis)

if err != nil {
log.Errorf("grpc stopped with error: %s", err.Error())
}

log.Info("grpc stopped")

wg.Done()
}

Loading…
Cancel
Save