From 922ffc8693a8138d2928090ba06eeb7cf99adfc6 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 1 Jul 2024 10:03:38 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E9=83=A8=E5=88=86?= =?UTF-8?q?=E5=AE=9E=E7=94=A8=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/sync2/counter_cond.go | 2 + utils/sync2/unbound_channel.go | 74 ++++++++++++++++++++++++++++++++++ utils/time2/duration.go | 28 +++++++++++++ utils/time2/time2_test.go | 25 ++++++++++++ 4 files changed, 129 insertions(+) create mode 100644 utils/sync2/unbound_channel.go create mode 100644 utils/time2/duration.go create mode 100644 utils/time2/time2_test.go diff --git a/utils/sync2/counter_cond.go b/utils/sync2/counter_cond.go index 8da29e2..03bc8b6 100644 --- a/utils/sync2/counter_cond.go +++ b/utils/sync2/counter_cond.go @@ -32,7 +32,9 @@ func (c *CounterCond) Wait() bool { } func (c *CounterCond) Release() { + c.cond.L.Lock() c.count++ + c.cond.L.Unlock() c.cond.Signal() } diff --git a/utils/sync2/unbound_channel.go b/utils/sync2/unbound_channel.go new file mode 100644 index 0000000..ff051a4 --- /dev/null +++ b/utils/sync2/unbound_channel.go @@ -0,0 +1,74 @@ +package sync2 + +import ( + "container/list" + "sync" +) + +type UnboundChannel[T any] struct { + values *list.List + cond *sync.Cond + err error +} + +func NewUnboundChannel[T any]() *UnboundChannel[T] { + return &UnboundChannel[T]{ + values: list.New(), + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (c *UnboundChannel[T]) Error() error { + return c.err +} + +func (c *UnboundChannel[T]) Send(val T) error { + c.cond.L.Lock() + if c.err != nil { + c.cond.L.Unlock() + return c.err + } + c.values.PushBack(val) + c.cond.L.Unlock() + + c.cond.Signal() + return nil +} + +func (c *UnboundChannel[T]) Receive() (T, error) { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + if c.values.Len() == 0 { + c.cond.Wait() + } + + if c.values.Len() == 0 { + var ret T + return ret, c.err + } + + ret := c.values.Front().Value.(T) + c.values.Remove(c.values.Front()) + return ret, nil +} + +func (c *UnboundChannel[T]) Close() { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = ErrChannelClosed + c.cond.L.Unlock() + c.cond.Broadcast() +} + +func (c *UnboundChannel[T]) CloseWithError(err error) { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = err + c.cond.L.Unlock() + c.cond.Broadcast() +} diff --git a/utils/time2/duration.go b/utils/time2/duration.go new file mode 100644 index 0000000..ebe0425 --- /dev/null +++ b/utils/time2/duration.go @@ -0,0 +1,28 @@ +package time2 + +import ( + "fmt" + "time" +) + +type Duration struct { + time.Duration +} + +func (d *Duration) Std() time.Duration { + return d.Duration +} + +func (d *Duration) Scan(state fmt.ScanState, verb rune) error { + data, err := state.Token(true, nil) + if err != nil { + return err + } + + d.Duration, err = time.ParseDuration(string(data)) + if err != nil { + return err + } + + return nil +} diff --git a/utils/time2/time2_test.go b/utils/time2/time2_test.go new file mode 100644 index 0000000..2dd0202 --- /dev/null +++ b/utils/time2/time2_test.go @@ -0,0 +1,25 @@ +package time2 + +import ( + "fmt" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Duration(t *testing.T) { + Convey("从字符串解析", t, func() { + dur := Duration{} + _, err := fmt.Sscanf("10s", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) + + Convey("包含空白字符", t, func() { + dur := Duration{} + _, err := fmt.Sscanf(" 10s\t\n\r", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) +} From 3978d820a8e67ac37a76eab9dae294dd4ecce2db Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Tue, 2 Jul 2024 17:36:52 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=B7=A5=E5=85=B7?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/sync2/event.go | 48 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 utils/sync2/event.go diff --git a/utils/sync2/event.go b/utils/sync2/event.go new file mode 100644 index 0000000..157c066 --- /dev/null +++ b/utils/sync2/event.go @@ -0,0 +1,48 @@ +package sync2 + +import ( + "context" + "errors" + "sync" +) + +var ErrEventClosed = errors.New("event is closed") +var ErrContextCanceled = errors.New("context canceled") + +type Event struct { + ch chan any + closeOnce sync.Once +} + +func NewEvent() Event { + return Event{ + ch: make(chan any, 1), + } +} + +func (e *Event) Set() { + select { + case e.ch <- nil: + default: + } +} + +func (e *Event) Wait(ctx context.Context) error { + select { + case _, ok := <-e.ch: + if ok { + return nil + } + + return ErrEventClosed + + case <-ctx.Done(): + return ErrContextCanceled + } +} + +func (e *Event) Close() { + e.closeOnce.Do(func() { + close(e.ch) + }) +} From 0981794a9815cc84d701b8521c33f7b6dcf1d9aa Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 10 Jul 2024 11:10:09 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/future/set_value_future.go | 59 +++++++++++++++++++++------------ pkgs/future/set_void_future.go | 16 ++++++--- utils/sync2/sync2.go | 13 +++++--- 3 files changed, 57 insertions(+), 31 deletions(-) diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index 4aea9e8..bf7f94a 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -2,6 +2,7 @@ package future import ( "context" + "sync" ) type SetValueFuture[T any] struct { @@ -9,6 +10,7 @@ type SetValueFuture[T any] struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetValue[T any]() *SetValueFuture[T] { @@ -18,22 +20,28 @@ func NewSetValue[T any]() *SetValueFuture[T] { } func (f *SetValueFuture[T]) SetComplete(val T, err error) { - f.value = val - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value = val + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) SetValue(val T) { - f.value = val - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value = val + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture[T]) Error() error { @@ -77,6 +85,7 @@ type SetValueFuture2[T1 any, T2 any] struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { @@ -86,24 +95,30 @@ func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { } func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) { - f.value1 = val1 - f.value2 = val2 - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value1 = val1 + f.value2 = val2 + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) { - f.value1 = val1 - f.value2 = val2 - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.value1 = val1 + f.value2 = val2 + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetValueFuture2[T1, T2]) Error() error { diff --git a/pkgs/future/set_void_future.go b/pkgs/future/set_void_future.go index d1cbca2..ed96a65 100644 --- a/pkgs/future/set_void_future.go +++ b/pkgs/future/set_void_future.go @@ -2,12 +2,14 @@ package future import ( "context" + "sync" ) type SetVoidFuture struct { err error isCompleted bool completeChan chan any + completeOnce sync.Once } func NewSetVoid() *SetVoidFuture { @@ -17,14 +19,18 @@ func NewSetVoid() *SetVoidFuture { } func (f *SetVoidFuture) SetVoid() { - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetVoidFuture) SetError(err error) { - f.err = err - f.isCompleted = true - close(f.completeChan) + f.completeOnce.Do(func() { + f.err = err + f.isCompleted = true + close(f.completeChan) + }) } func (f *SetVoidFuture) Error() error { diff --git a/utils/sync2/sync2.go b/utils/sync2/sync2.go index c6ab32f..aa6bd50 100644 --- a/utils/sync2/sync2.go +++ b/utils/sync2/sync2.go @@ -1,9 +1,14 @@ package sync2 -import "sync" +import ( + "sync" + "sync/atomic" +) func ParallelDo[T any](args []T, fn func(val T, index int) error) error { - var err error + err := atomic.Value{} + err.Store((error)(nil)) + var wg sync.WaitGroup wg.Add(len(args)) for i, arg := range args { @@ -11,10 +16,10 @@ func ParallelDo[T any](args []T, fn func(val T, index int) error) error { defer wg.Done() if e := fn(arg, index); e != nil { - err = e + err.CompareAndSwap((error)(nil), e) } }(arg, i) } wg.Wait() - return err + return err.Load().(error) } From 610a39b66e9c5384e8ad7fd9cfadda3e39d47642 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 15 Jul 2024 09:22:41 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/storage/models.go | 4 ++++ utils/sync2/sync2.go | 14 +++++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/sdks/storage/models.go b/sdks/storage/models.go index b1dad0c..9a79cd6 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -128,6 +128,10 @@ type Node struct { LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"` } +func (n Node) String() string { + return fmt.Sprintf("%v(%v)", n.Name, n.NodeID) +} + type PinnedObject struct { ObjectID ObjectID `db:"ObjectID" json:"objectID"` NodeID NodeID `db:"NodeID" json:"nodeID"` diff --git a/utils/sync2/sync2.go b/utils/sync2/sync2.go index aa6bd50..8555a3b 100644 --- a/utils/sync2/sync2.go +++ b/utils/sync2/sync2.go @@ -2,12 +2,11 @@ package sync2 import ( "sync" - "sync/atomic" ) func ParallelDo[T any](args []T, fn func(val T, index int) error) error { - err := atomic.Value{} - err.Store((error)(nil)) + lock := sync.Mutex{} + var err error var wg sync.WaitGroup wg.Add(len(args)) @@ -16,10 +15,15 @@ func ParallelDo[T any](args []T, fn func(val T, index int) error) error { defer wg.Done() if e := fn(arg, index); e != nil { - err.CompareAndSwap((error)(nil), e) + lock.Lock() + if err == nil { + err = e + } + lock.Unlock() } }(arg, i) } wg.Wait() - return err.Load().(error) + + return err } From c9d770089ed7b675dd64a7fdbf6c3fafa3d1cd0c Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 17 Jul 2024 14:21:55 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E5=B0=86executor=E6=94=B9=E9=80=A0?= =?UTF-8?q?=E6=88=90restful?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/http/http.go | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/http/http.go b/utils/http/http.go index 55f5a3b..477218b 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -24,6 +24,7 @@ const ( ContentTypeForm = "application/x-www-form-urlencoded" ContentTypeMultiPart = "multipart/form-data" ContentTypeOctetStream = "application/octet-stream" + ContentTypeEventStream = "text/event-stream" ) var defaultClient = http.DefaultClient From 170160d35da59450ecdffd64e1d5e4f09e96f293 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 25 Jul 2024 10:08:53 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=9E=E7=94=A8?= =?UTF-8?q?=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/lo2/lo.go | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/utils/lo2/lo.go b/utils/lo2/lo.go index 9011fe8..69e6150 100644 --- a/utils/lo2/lo.go +++ b/utils/lo2/lo.go @@ -19,6 +19,22 @@ func RemoveAt[T any](arr []T, index int) []T { return append(arr[:index], arr[index+1:]...) } +func RemoveAllDefault[T comparable](arr []T) []T { + var def T + return lo.Filter(arr, func(i T, idx int) bool { + return i != def + }) +} + +func Clear[T comparable](arr []T, item T) { + var def T + for i := 0; i < len(arr); i++ { + if arr[i] == item { + arr[i] = def + } + } +} + func ArrayClone[T any](arr []T) []T { return append([]T{}, arr...) } @@ -29,3 +45,12 @@ func Insert[T any](arr []T, index int, item T) []T { arr[index] = item return arr } + +func Deref[T any](arr []*T) []T { + result := make([]T, len(arr)) + for i := 0; i < len(arr); i++ { + result[i] = *arr[i] + } + + return result +}