diff --git a/common/pkgs/sysevent/config.go b/common/pkgs/sysevent/config.go new file mode 100644 index 0000000..dfe1bf5 --- /dev/null +++ b/common/pkgs/sysevent/config.go @@ -0,0 +1,8 @@ +package sysevent + +type Config struct { + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + VHost string `json:"vhost"` +} diff --git a/common/pkgs/sysevent/publisher.go b/common/pkgs/sysevent/publisher.go new file mode 100644 index 0000000..5b42c93 --- /dev/null +++ b/common/pkgs/sysevent/publisher.go @@ -0,0 +1,119 @@ +package sysevent + +import ( + "fmt" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type PublisherEvent interface{} + +type PublisherExited struct { + Err error +} + +type PublishError struct { + Err error +} + +type OtherError struct { + Err error +} + +type Publisher struct { + connection *amqp.Connection + channel *amqp.Channel + eventChan *async.UnboundChannel[SysEvent] + thisSource Source +} + +func NewPublisher(cfg Config, thisSource Source) (*Publisher, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + return nil, fmt.Errorf("declare queue: %w", err) + } + + pub := &Publisher{ + connection: connection, + channel: channel, + eventChan: async.NewUnboundChannel[SysEvent](), + thisSource: thisSource, + } + + return pub, nil +} + +func (p *Publisher) Start() *async.UnboundChannel[PublisherEvent] { + ch := async.NewUnboundChannel[PublisherEvent]() + go func() { + defer ch.Close() + defer p.channel.Close() + defer p.connection.Close() + + for { + event := <-p.eventChan.Receive().Chan() + if event.Err != nil { + if event.Err == async.ErrChannelClosed { + ch.Send(PublisherExited{Err: nil}) + } else { + ch.Send(PublisherExited{Err: event.Err}) + } + return + } + + eventData, err := serder.ObjectToJSONEx(event.Value) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("serialize event data: %w", err)}) + continue + } + + err = p.channel.Publish("", SysEventQueueName, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: eventData, + Expiration: "60000", // 消息超时时间默认1分钟 + }) + if err != nil { + ch.Send(PublishError{Err: err}) + continue + } + } + }() + + return ch +} + +// Publish 发布事件,自动补齐时间戳和源信息 +func (p *Publisher) Publish(evt SysEvent) { + // TODO 补齐时间戳和源信息 + p.eventChan.Send(evt) +} + +// PublishRaw 完全原样发布事件,不补齐任何信息 +func (p *Publisher) PublishRaw(evt SysEvent) { + p.eventChan.Send(evt) +} diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go new file mode 100644 index 0000000..082b93e --- /dev/null +++ b/common/pkgs/sysevent/sysevent.go @@ -0,0 +1,9 @@ +package sysevent + +const ( + SysEventQueueName = "SysEventQueue" +) + +type SysEvent = any // TODO 换成具体的类型 + +type Source = any // TODO 换成具体的类型 diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go new file mode 100644 index 0000000..6ffbd72 --- /dev/null +++ b/common/pkgs/sysevent/watcher.go @@ -0,0 +1,121 @@ +package sysevent + +import ( + "fmt" + "sync" + + "github.com/streadway/amqp" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type Watcher interface { + OnEvent(event SysEvent) +} + +type WatcherEvent interface{} + +type WatcherExited struct { + Err error +} + +type WatcherHost struct { + watchers []Watcher + lock sync.Mutex + connection *amqp.Connection + channel *amqp.Channel + recvChan <-chan amqp.Delivery +} + +func NewWatcherHost(cfg Config) (*WatcherHost, error) { + config := amqp.Config{ + Vhost: cfg.VHost, + } + + url := fmt.Sprintf("amqp://%s:%s@%s", cfg.Account, cfg.Password, cfg.Address) + connection, err := amqp.DialConfig(url, config) + if err != nil { + return nil, err + } + + channel, err := connection.Channel() + if err != nil { + connection.Close() + return nil, fmt.Errorf("openning channel on connection: %w", err) + } + + _, err = channel.QueueDeclare( + SysEventQueueName, + false, + true, + false, + false, + nil, + ) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("declare queue: %w", err) + } + + recvChan, err := channel.Consume(SysEventQueueName, "", true, false, true, false, nil) + if err != nil { + channel.Close() + connection.Close() + return nil, fmt.Errorf("consume queue: %w", err) + } + + wat := &WatcherHost{ + connection: connection, + channel: channel, + recvChan: recvChan, + } + + return wat, nil +} + +func (w *WatcherHost) Start() *async.UnboundChannel[WatcherEvent] { + ch := async.NewUnboundChannel[WatcherEvent]() + + go func() { + defer ch.Close() + defer w.channel.Close() + defer w.connection.Close() + + for m := range w.recvChan { + evt, err := serder.JSONToObjectEx[SysEvent](m.Body) + if err != nil { + ch.Send(OtherError{Err: fmt.Errorf("deserialize event: %w", err)}) + continue + } + + w.lock.Lock() + ws := make([]Watcher, 0, len(w.watchers)) + ws = append(ws, w.watchers...) + w.lock.Unlock() + + for _, w := range ws { + w.OnEvent(evt) + } + } + + ch.Send(WatcherExited{Err: nil}) + }() + + return ch +} + +func (w *WatcherHost) AddWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = append(w.watchers, watcher) +} + +func (w *WatcherHost) RemoveWatcher(watcher Watcher) { + w.lock.Lock() + defer w.lock.Unlock() + + w.watchers = lo2.Remove(w.watchers, watcher) +}