diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index 4aea9e8..bf7f94a 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -2,6 +2,7 @@ package future import ( "context" + "sync" ) type SetValueFuture[T any] struct { @@ -9,6 +10,7 @@ type SetValueFuture[T any] struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetValue[T any]() *SetValueFuture[T] { @@ -18,22 +20,28 @@ func NewSetValue[T any]() *SetValueFuture[T] { } func (f *SetValueFuture[T]) SetComplete(val T, err error) { - f.value = val - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value = val + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) SetValue(val T) { - f.value = val - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value = val + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) Error() error { @@ -77,6 +85,7 @@ type SetValueFuture2[T1 any, T2 any] struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { @@ -86,24 +95,30 @@ func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { } func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) { - f.value1 = val1 - f.value2 = val2 - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value1 = val1 + f.value2 = val2 + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) { - f.value1 = val1 - f.value2 = val2 - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value1 = val1 + f.value2 = val2 + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) Error() error { diff --git a/pkgs/future/set_void_future.go b/pkgs/future/set_void_future.go index d1cbca2..ed96a65 100644 --- a/pkgs/future/set_void_future.go +++ b/pkgs/future/set_void_future.go @@ -2,12 +2,14 @@ package future import ( "context" + "sync" ) type SetVoidFuture struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetVoid() *SetVoidFuture { @@ -17,14 +19,18 @@ func NewSetVoid() *SetVoidFuture { } func (f *SetVoidFuture) SetVoid() { - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetVoidFuture) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetVoidFuture) Error() error { diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 20e47f6..6234a19 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -128,6 +128,10 @@ type Node struct { LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` } +func (n Node) String() string { + return fmt.Sprintf("%v(%v)", n.Name, n.NodeID) +} + type PinnedObject struct { ObjectID ObjectID `db:"ObjectID" json:"objectID"` NodeID NodeID `db:"NodeID" json:"nodeID"` diff --git a/utils/http/http.go b/utils/http/http.go index d4deb41..e8ffbbc 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -24,6 +24,7 @@ const ( ContentTypeForm = "application/x-www-form-urlencoded" ContentTypeMultiPart = "multipart/form-data" ContentTypeOctetStream = "application/octet-stream" + ContentTypeEventStream = "text/event-stream" ) var defaultClient = http.DefaultClient diff --git a/utils/lo2/lo.go b/utils/lo2/lo.go index 9011fe8..69e6150 100644 --- a/utils/lo2/lo.go +++ b/utils/lo2/lo.go @@ -19,6 +19,22 @@ func RemoveAt[T any](arr []T, index int) []T { return append(arr[:index], arr[index+1:]...) } +func RemoveAllDefault[T comparable](arr []T) []T { + var def T + return lo.Filter(arr, func(i T, idx int) bool { + return i != def + }) +} + +func Clear[T comparable](arr []T, item T) { + var def T + for i := 0; i < len(arr); i++ { + if arr[i] == item { + arr[i] = def + } + } +} + func ArrayClone[T any](arr []T) []T { return append([]T{}, arr...) } @@ -29,3 +45,12 @@ func Insert[T any](arr []T, index int, item T) []T { arr[index] = item return arr } + +func Deref[T any](arr []*T) []T { + result := make([]T, len(arr)) + for i := 0; i < len(arr); i++ { + result[i] = *arr[i] + } + + return result +} diff --git a/utils/sync2/counter_cond.go b/utils/sync2/counter_cond.go index 8da29e2..03bc8b6 100644 --- a/utils/sync2/counter_cond.go +++ b/utils/sync2/counter_cond.go @@ -32,7 +32,9 @@ func (c *CounterCond) Wait() bool { } func (c *CounterCond) Release() { + c.cond.L.Lock() c.count++ + c.cond.L.Unlock() c.cond.Signal() } diff --git a/utils/sync2/event.go b/utils/sync2/event.go new file mode 100644 index 0000000..157c066 --- /dev/null +++ b/utils/sync2/event.go @@ -0,0 +1,48 @@ +package sync2 + +import ( + "context" + "errors" + "sync" +) + +var ErrEventClosed = errors.New("event is closed") +var ErrContextCanceled = errors.New("context canceled") + +type Event struct { + ch chan any + closeOnce sync.Once +} + +func NewEvent() Event { + return Event{ + ch: make(chan any, 1), + } +} + +func (e *Event) Set() { + select { + case e.ch <- nil: + default: + } +} + +func (e *Event) Wait(ctx context.Context) error { + select { + case _, ok := <-e.ch: + if ok { + return nil + } + + return ErrEventClosed + + case <-ctx.Done(): + return ErrContextCanceled + } +} + +func (e *Event) Close() { + e.closeOnce.Do(func() { + close(e.ch) + }) +} diff --git a/utils/sync2/sync2.go b/utils/sync2/sync2.go index c6ab32f..8555a3b 100644 --- a/utils/sync2/sync2.go +++ b/utils/sync2/sync2.go @@ -1,9 +1,13 @@ package sync2 -import "sync" +import ( + "sync" +) func ParallelDo[T any](args []T, fn func(val T, index int) error) error { + lock := sync.Mutex{} var err error + var wg sync.WaitGroup wg.Add(len(args)) for i, arg := range args { @@ -11,10 +15,15 @@ func ParallelDo[T any](args []T, fn func(val T, index int) error) error { defer wg.Done() if e := fn(arg, index); e != nil { - err = e + lock.Lock() + if err == nil { + err = e + } + lock.Unlock() } }(arg, i) } wg.Wait() + return err } diff --git a/utils/sync2/unbound_channel.go b/utils/sync2/unbound_channel.go new file mode 100644 index 0000000..ff051a4 --- /dev/null +++ b/utils/sync2/unbound_channel.go @@ -0,0 +1,74 @@ +package sync2 + +import ( + "container/list" + "sync" +) + +type UnboundChannel[T any] struct { + values *list.List + cond *sync.Cond + err error +} + +func NewUnboundChannel[T any]() *UnboundChannel[T] { + return &UnboundChannel[T]{ + values: list.New(), + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (c *UnboundChannel[T]) Error() error { + return c.err +} + +func (c *UnboundChannel[T]) Send(val T) error { + c.cond.L.Lock() + if c.err != nil { + c.cond.L.Unlock() + return c.err + } + c.values.PushBack(val) + c.cond.L.Unlock() + + c.cond.Signal() + return nil +} + +func (c *UnboundChannel[T]) Receive() (T, error) { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + if c.values.Len() == 0 { + c.cond.Wait() + } + + if c.values.Len() == 0 { + var ret T + return ret, c.err + } + + ret := c.values.Front().Value.(T) + c.values.Remove(c.values.Front()) + return ret, nil +} + +func (c *UnboundChannel[T]) Close() { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = ErrChannelClosed + c.cond.L.Unlock() + c.cond.Broadcast() +} + +func (c *UnboundChannel[T]) CloseWithError(err error) { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = err + c.cond.L.Unlock() + c.cond.Broadcast() +} diff --git a/utils/time2/duration.go b/utils/time2/duration.go new file mode 100644 index 0000000..ebe0425 --- /dev/null +++ b/utils/time2/duration.go @@ -0,0 +1,28 @@ +package time2 + +import ( + "fmt" + "time" +) + +type Duration struct { + time.Duration +} + +func (d *Duration) Std() time.Duration { + return d.Duration +} + +func (d *Duration) Scan(state fmt.ScanState, verb rune) error { + data, err := state.Token(true, nil) + if err != nil { + return err + } + + d.Duration, err = time.ParseDuration(string(data)) + if err != nil { + return err + } + + return nil +} diff --git a/utils/time2/time2_test.go b/utils/time2/time2_test.go new file mode 100644 index 0000000..2dd0202 --- /dev/null +++ b/utils/time2/time2_test.go @@ -0,0 +1,25 @@ +package time2 + +import ( + "fmt" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Duration(t *testing.T) { + Convey("从字符串解析", t, func() { + dur := Duration{} + _, err := fmt.Sscanf("10s", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) + + Convey("包含空白字符", t, func() { + dur := Duration{} + _, err := fmt.Sscanf(" 10s\t\n\r", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) +}