Browse Source

1、新增增量模型更新任务

2、完成执行器任务机制与消息推送等优化
gitlink
JeshuaRen 1 year ago
parent
commit
c44dc0225a
8 changed files with 277 additions and 84 deletions
  1. +2
    -2
      pkgs/actor/actor.go
  2. +89
    -0
      pkgs/async/unbound_channel.go
  3. +1
    -1
      pkgs/distlock/internal/acquire_actor.go
  4. +18
    -5
      pkgs/future/future.go
  5. +87
    -0
      pkgs/future/ready.go
  6. +63
    -62
      pkgs/future/set_value_future.go
  7. +15
    -12
      pkgs/future/set_void_future.go
  8. +2
    -2
      sdks/scheduler/models.go

+ 2
- 2
pkgs/actor/actor.go View File

@@ -126,7 +126,7 @@ func WaitValue[T any](ctx context.Context, c *CommandChannel, cmd func() (T, err
fut.SetComplete(val, err)
})

return fut.WaitValue(ctx)
return fut.Wait(ctx)
}

func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func() (T1, T2, error)) (T1, T2, error) {
@@ -137,5 +137,5 @@ func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func
fut.SetComplete(val1, val2, err)
})

return fut.WaitValue(ctx)
return fut.Wait(ctx)
}

+ 89
- 0
pkgs/async/unbound_channel.go View File

@@ -0,0 +1,89 @@
package async

import (
"container/list"
"errors"
"gitlink.org.cn/cloudream/common/pkgs/future"
"sync"
)

var ErrChannelClosed = errors.New("channel is closed")

type UnboundChannel[T any] struct {
values *list.List
waiters []*future.SetValueFuture[T]
lock sync.Mutex
err error
}

func NewUnboundChannel[T any]() *UnboundChannel[T] {
return &UnboundChannel[T]{
values: list.New(),
}
}

func (c *UnboundChannel[T]) Error() error {
return c.err
}

func (c *UnboundChannel[T]) Send(val T) error {
c.lock.Lock()
defer c.lock.Unlock()

if c.err != nil {
return c.err
}

c.values.PushBack(val)

for len(c.waiters) > 0 && c.values.Len() > 0 {
waiter := c.waiters[0]
waiter.SetValue(c.values.Front().Value.(T))
c.values.Remove(c.values.Front())
c.waiters = c.waiters[1:]
return nil
}

return nil
}

func (c *UnboundChannel[T]) Receive() future.Future1[T] {
c.lock.Lock()
defer c.lock.Unlock()

if c.err != nil {
return future.NewReadyError1[T](c.err)
}

if c.values.Len() > 0 {
ret := c.values.Front().Value.(T)
c.values.Remove(c.values.Front())
return future.NewReadyValue1[T](ret)
}

fut := future.NewSetValue[T]()
c.waiters = append(c.waiters, fut)

return fut
}

func (c *UnboundChannel[T]) Close() {
c.CloseWithError(ErrChannelClosed)
}

func (c *UnboundChannel[T]) CloseWithError(err error) {
c.lock.Lock()
defer c.lock.Unlock()

if c.err != nil {
return
}
c.err = err

for i := 0; i < len(c.waiters); i++ {
c.waiters[i].SetError(c.err)
}

c.waiters = nil
c.values = nil
}

+ 1
- 1
pkgs/distlock/internal/acquire_actor.go View File

@@ -93,7 +93,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er
}()

// 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。
return info.Callback.WaitValue(context.Background())
return info.Callback.Wait(context.Background())
}

// TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者


+ 18
- 5
pkgs/future/future.go View File

@@ -6,18 +6,31 @@ import (
)

var ErrContextCancelled = fmt.Errorf("context cancelled")
var ErrCompleted = fmt.Errorf("context cancelled")

type Future interface {
Error() error
IsComplete() bool

Chan() <-chan error

Wait(ctx context.Context) error
}

type ValueFuture[T any] interface {
Future
type ChanValue1[T any] struct {
Value T
Err error
}

type ChanValue2[T1 any, T2 any] struct {
Value1 T1
Value2 T2
Err error
}

type Future1[T any] interface {
IsComplete() bool

Value() T
Chan() <-chan ChanValue1[T]

WaitValue(ctx context.Context) (T, error)
Wait(ctx context.Context) (T, error)
}

+ 87
- 0
pkgs/future/ready.go View File

@@ -0,0 +1,87 @@
package future

import "context"

type Ready struct {
ch chan error
}

func NewReady(err error) *Ready {
ch := make(chan error, 1)
ch <- err
close(ch)

return &Ready{
ch: ch,
}
}

func (f *Ready) IsComplete() bool {
return true
}

func (f *Ready) Wait(ctx context.Context) error {
select {
case v, ok := <-f.ch:
if !ok {
return ErrCompleted
}
return v

case <-ctx.Done():
return ErrContextCancelled
}
}

func (f *Ready) Chan() <-chan error {
return f.ch
}

type Ready1[T any] struct {
ch chan ChanValue1[T]
}

func NewReady1[T any](val T, err error) *Ready1[T] {
ch := make(chan ChanValue1[T], 1)
ch <- ChanValue1[T]{
Err: err,
Value: val,
}
close(ch)

return &Ready1[T]{
ch: ch,
}
}

func NewReadyValue1[T any](val T) *Ready1[T] {
return NewReady1[T](val, nil)
}

func NewReadyError1[T any](err error) *Ready1[T] {
var ret T
return NewReady1[T](ret, err)
}

func (f *Ready1[T]) IsComplete() bool {
return true
}

func (f *Ready1[T]) Wait(ctx context.Context) (T, error) {
select {
case cv, ok := <-f.ch:
if !ok {
var ret T
return ret, cv.Err
}
return cv.Value, cv.Err

case <-ctx.Done():
var ret T
return ret, ErrContextCancelled
}
}

func (f *Ready1[T]) Chan() <-chan ChanValue1[T] {
return f.ch
}

+ 63
- 62
pkgs/future/set_value_future.go View File

@@ -6,72 +6,76 @@ import (
)

type SetValueFuture[T any] struct {
value T
err error
isCompleted bool
completeChan chan any
ch chan ChanValue1[T]
completeOnce sync.Once
}

func NewSetValue[T any]() *SetValueFuture[T] {
return &SetValueFuture[T]{
completeChan: make(chan any),
ch: make(chan ChanValue1[T], 1),
}
}

func (f *SetValueFuture[T]) SetComplete(val T, err error) {
f.completeOnce.Do(func() {
f.value = val
f.err = err
f.ch <- ChanValue1[T]{
Err: err,
Value: val,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) SetValue(val T) {
f.completeOnce.Do(func() {
f.value = val
f.ch <- ChanValue1[T]{
Value: val,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) SetError(err error) {
f.completeOnce.Do(func() {
f.err = err
f.ch <- ChanValue1[T]{
Err: err,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) Error() error {
return f.err
}

func (f *SetValueFuture[T]) Value() T {
return f.value
}

func (f *SetValueFuture[T]) IsComplete() bool {
return f.isCompleted
}

// 等待直到Complete或者ctx被取消。
// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete
func (f *SetValueFuture[T]) Wait(ctx context.Context) error {
select {
case <-f.completeChan:
return f.err

case <-ctx.Done():
return ErrContextCancelled
}
func (f *SetValueFuture[T]) Chan() <-chan ChanValue1[T] {
return f.ch
}

func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) {
// 等待直到Complete或者ctx被取消。
// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete
//func (f *SetValueFuture[T]) Wait(ctx context.Context) error {
// select {
// case <-f.ch:
// return f.err
//
// case <-ctx.Done():
// return ErrContextCancelled
// }
//}

func (f *SetValueFuture[T]) Wait(ctx context.Context) (T, error) {
select {
case <-f.completeChan:
return f.value, f.err
case cv, ok := <-f.ch:
if !ok {
var ret T
return ret, cv.Err
}
return cv.Value, cv.Err

case <-ctx.Done():
var ret T
@@ -80,68 +84,61 @@ func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) {
}

type SetValueFuture2[T1 any, T2 any] struct {
value1 T1
value2 T2
err error
isCompleted bool
completeChan chan any
ch chan ChanValue2[T1, T2]
completeOnce sync.Once
}

func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] {
return &SetValueFuture2[T1, T2]{
completeChan: make(chan any),
ch: make(chan ChanValue2[T1, T2], 1),
}
}

func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) {
f.completeOnce.Do(func() {
f.value1 = val1
f.value2 = val2
f.err = err
f.ch <- ChanValue2[T1, T2]{
Value1: val1,
Value2: val2,
Err: err,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) {
f.completeOnce.Do(func() {
f.value1 = val1
f.value2 = val2
f.ch <- ChanValue2[T1, T2]{
Value1: val1,
Value2: val2,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) SetError(err error) {
f.completeOnce.Do(func() {
f.err = err
f.ch <- ChanValue2[T1, T2]{
Err: err,
}
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) Error() error {
return f.err
}

func (f *SetValueFuture2[T1, T2]) Value() (T1, T2) {
return f.value1, f.value2
}

func (f *SetValueFuture2[T1, T2]) IsComplete() bool {
return f.isCompleted
}

func (f *SetValueFuture2[T1, T2]) Wait() error {
<-f.completeChan
return f.err
}

func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error) {
func (f *SetValueFuture2[T1, T2]) Wait(ctx context.Context) (T1, T2, error) {
select {
case <-f.completeChan:
return f.value1, f.value2, f.err
case cv, ok := <-f.ch:
if !ok {
return cv.Value1, cv.Value2, cv.Err
}
return cv.Value1, cv.Value2, cv.Err

case <-ctx.Done():
var ret1 T1
@@ -149,3 +146,7 @@ func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error)
return ret1, ret2, ErrContextCancelled
}
}

func (f *SetValueFuture2[T1, T2]) Chan() <-chan ChanValue2[T1, T2] {
return f.ch
}

+ 15
- 12
pkgs/future/set_void_future.go View File

@@ -6,47 +6,50 @@ import (
)

type SetVoidFuture struct {
err error
isCompleted bool
completeChan chan any
ch chan error
completeOnce sync.Once
}

func NewSetVoid() *SetVoidFuture {
return &SetVoidFuture{
completeChan: make(chan any),
ch: make(chan error, 1),
}
}

func (f *SetVoidFuture) SetVoid() {
f.completeOnce.Do(func() {
f.ch <- nil
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetVoidFuture) SetError(err error) {
f.completeOnce.Do(func() {
f.err = err
f.ch <- err
close(f.ch)
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetVoidFuture) Error() error {
return f.err
}

func (f *SetVoidFuture) IsComplete() bool {
return f.isCompleted
}

func (f *SetVoidFuture) Wait(ctx context.Context) error {
select {
case <-f.completeChan:
return f.err
case v, ok := <-f.ch:
if !ok {
return ErrCompleted
}
return v

case <-ctx.Done():
return ErrContextCancelled
}
}

func (f *SetVoidFuture) Chan() <-chan error {
return f.ch
}

+ 2
- 2
sdks/scheduler/models.go View File

@@ -90,8 +90,8 @@ type UpdateMultiInstanceJobInfo struct {
Files JobFilesInfo `json:"files"`
Runtime JobRuntimeInfo `json:"runtime"`
MultiInstanceJobSetID JobSetID `json:"multiInstanceJobSetID"`
InstanceIDs []JobID `json:"instanceIDs"`
UpdateStrategy string `json:"updateStrategy"`
//InstanceIDs []JobID `json:"instanceIDs"`
UpdateStrategy string `json:"updateStrategy"`
}

type ModelJobInfo struct {


Loading…
Cancel
Save