Browse Source

Merge branch 'master' into feature_rzs

gitlink
JeshuaRen 1 year ago
parent
commit
a5c038d1e5
11 changed files with 266 additions and 29 deletions
  1. +37
    -22
      pkgs/future/set_value_future.go
  2. +11
    -5
      pkgs/future/set_void_future.go
  3. +4
    -0
      sdks/storage/models.go
  4. +1
    -0
      utils/http/http.go
  5. +25
    -0
      utils/lo2/lo.go
  6. +2
    -0
      utils/sync2/counter_cond.go
  7. +48
    -0
      utils/sync2/event.go
  8. +11
    -2
      utils/sync2/sync2.go
  9. +74
    -0
      utils/sync2/unbound_channel.go
  10. +28
    -0
      utils/time2/duration.go
  11. +25
    -0
      utils/time2/time2_test.go

+ 37
- 22
pkgs/future/set_value_future.go View File

@@ -2,6 +2,7 @@ package future

import (
"context"
"sync"
)

type SetValueFuture[T any] struct {
@@ -9,6 +10,7 @@ type SetValueFuture[T any] struct {
err error
isCompleted bool
completeChan chan any
completeOnce sync.Once
}

func NewSetValue[T any]() *SetValueFuture[T] {
@@ -18,22 +20,28 @@ func NewSetValue[T any]() *SetValueFuture[T] {
}

func (f *SetValueFuture[T]) SetComplete(val T, err error) {
f.value = val
f.err = err
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.value = val
f.err = err
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) SetValue(val T) {
f.value = val
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.value = val
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) SetError(err error) {
f.err = err
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.err = err
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture[T]) Error() error {
@@ -77,6 +85,7 @@ type SetValueFuture2[T1 any, T2 any] struct {
err error
isCompleted bool
completeChan chan any
completeOnce sync.Once
}

func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] {
@@ -86,24 +95,30 @@ func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] {
}

func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) {
f.value1 = val1
f.value2 = val2
f.err = err
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.value1 = val1
f.value2 = val2
f.err = err
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) {
f.value1 = val1
f.value2 = val2
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.value1 = val1
f.value2 = val2
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) SetError(err error) {
f.err = err
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.err = err
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetValueFuture2[T1, T2]) Error() error {


+ 11
- 5
pkgs/future/set_void_future.go View File

@@ -2,12 +2,14 @@ package future

import (
"context"
"sync"
)

type SetVoidFuture struct {
err error
isCompleted bool
completeChan chan any
completeOnce sync.Once
}

func NewSetVoid() *SetVoidFuture {
@@ -17,14 +19,18 @@ func NewSetVoid() *SetVoidFuture {
}

func (f *SetVoidFuture) SetVoid() {
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetVoidFuture) SetError(err error) {
f.err = err
f.isCompleted = true
close(f.completeChan)
f.completeOnce.Do(func() {
f.err = err
f.isCompleted = true
close(f.completeChan)
})
}

func (f *SetVoidFuture) Error() error {


+ 4
- 0
sdks/storage/models.go View File

@@ -128,6 +128,10 @@ type Node struct {
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
}

func (n Node) String() string {
return fmt.Sprintf("%v(%v)", n.Name, n.NodeID)
}

type PinnedObject struct {
ObjectID ObjectID `db:"ObjectID" json:"objectID"`
NodeID NodeID `db:"NodeID" json:"nodeID"`


+ 1
- 0
utils/http/http.go View File

@@ -24,6 +24,7 @@ const (
ContentTypeForm = "application/x-www-form-urlencoded"
ContentTypeMultiPart = "multipart/form-data"
ContentTypeOctetStream = "application/octet-stream"
ContentTypeEventStream = "text/event-stream"
)

var defaultClient = http.DefaultClient


+ 25
- 0
utils/lo2/lo.go View File

@@ -19,6 +19,22 @@ func RemoveAt[T any](arr []T, index int) []T {
return append(arr[:index], arr[index+1:]...)
}

func RemoveAllDefault[T comparable](arr []T) []T {
var def T
return lo.Filter(arr, func(i T, idx int) bool {
return i != def
})
}

func Clear[T comparable](arr []T, item T) {
var def T
for i := 0; i < len(arr); i++ {
if arr[i] == item {
arr[i] = def
}
}
}

func ArrayClone[T any](arr []T) []T {
return append([]T{}, arr...)
}
@@ -29,3 +45,12 @@ func Insert[T any](arr []T, index int, item T) []T {
arr[index] = item
return arr
}

func Deref[T any](arr []*T) []T {
result := make([]T, len(arr))
for i := 0; i < len(arr); i++ {
result[i] = *arr[i]
}

return result
}

+ 2
- 0
utils/sync2/counter_cond.go View File

@@ -32,7 +32,9 @@ func (c *CounterCond) Wait() bool {
}

func (c *CounterCond) Release() {
c.cond.L.Lock()
c.count++
c.cond.L.Unlock()
c.cond.Signal()
}



+ 48
- 0
utils/sync2/event.go View File

@@ -0,0 +1,48 @@
package sync2

import (
"context"
"errors"
"sync"
)

var ErrEventClosed = errors.New("event is closed")
var ErrContextCanceled = errors.New("context canceled")

type Event struct {
ch chan any
closeOnce sync.Once
}

func NewEvent() Event {
return Event{
ch: make(chan any, 1),
}
}

func (e *Event) Set() {
select {
case e.ch <- nil:
default:
}
}

func (e *Event) Wait(ctx context.Context) error {
select {
case _, ok := <-e.ch:
if ok {
return nil
}

return ErrEventClosed

case <-ctx.Done():
return ErrContextCanceled
}
}

func (e *Event) Close() {
e.closeOnce.Do(func() {
close(e.ch)
})
}

+ 11
- 2
utils/sync2/sync2.go View File

@@ -1,9 +1,13 @@
package sync2

import "sync"
import (
"sync"
)

func ParallelDo[T any](args []T, fn func(val T, index int) error) error {
lock := sync.Mutex{}
var err error

var wg sync.WaitGroup
wg.Add(len(args))
for i, arg := range args {
@@ -11,10 +15,15 @@ func ParallelDo[T any](args []T, fn func(val T, index int) error) error {
defer wg.Done()

if e := fn(arg, index); e != nil {
err = e
lock.Lock()
if err == nil {
err = e
}
lock.Unlock()
}
}(arg, i)
}
wg.Wait()

return err
}

+ 74
- 0
utils/sync2/unbound_channel.go View File

@@ -0,0 +1,74 @@
package sync2

import (
"container/list"
"sync"
)

type UnboundChannel[T any] struct {
values *list.List
cond *sync.Cond
err error
}

func NewUnboundChannel[T any]() *UnboundChannel[T] {
return &UnboundChannel[T]{
values: list.New(),
cond: sync.NewCond(&sync.Mutex{}),
}
}

func (c *UnboundChannel[T]) Error() error {
return c.err
}

func (c *UnboundChannel[T]) Send(val T) error {
c.cond.L.Lock()
if c.err != nil {
c.cond.L.Unlock()
return c.err
}
c.values.PushBack(val)
c.cond.L.Unlock()

c.cond.Signal()
return nil
}

func (c *UnboundChannel[T]) Receive() (T, error) {
c.cond.L.Lock()
defer c.cond.L.Unlock()

if c.values.Len() == 0 {
c.cond.Wait()
}

if c.values.Len() == 0 {
var ret T
return ret, c.err
}

ret := c.values.Front().Value.(T)
c.values.Remove(c.values.Front())
return ret, nil
}

func (c *UnboundChannel[T]) Close() {
c.cond.L.Lock()
if c.err != nil {
return
}
c.err = ErrChannelClosed
c.cond.L.Unlock()
c.cond.Broadcast()
}

func (c *UnboundChannel[T]) CloseWithError(err error) {
c.cond.L.Lock()
if c.err != nil {
return
}
c.err = err
c.cond.L.Unlock()
c.cond.Broadcast()
}

+ 28
- 0
utils/time2/duration.go View File

@@ -0,0 +1,28 @@
package time2

import (
"fmt"
"time"
)

type Duration struct {
time.Duration
}

func (d *Duration) Std() time.Duration {
return d.Duration
}

func (d *Duration) Scan(state fmt.ScanState, verb rune) error {
data, err := state.Token(true, nil)
if err != nil {
return err
}

d.Duration, err = time.ParseDuration(string(data))
if err != nil {
return err
}

return nil
}

+ 25
- 0
utils/time2/time2_test.go View File

@@ -0,0 +1,25 @@
package time2

import (
"fmt"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func Test_Duration(t *testing.T) {
Convey("从字符串解析", t, func() {
dur := Duration{}
_, err := fmt.Sscanf("10s", "%v", &dur)
So(err, ShouldEqual, nil)
So(dur.Std(), ShouldEqual, 10*time.Second)
})

Convey("包含空白字符", t, func() {
dur := Duration{}
_, err := fmt.Sscanf(" 10s\t\n\r", "%v", &dur)
So(err, ShouldEqual, nil)
So(dur.Std(), ShouldEqual, 10*time.Second)
})
}

Loading…
Cancel
Save