diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index 78589b8..d53f199 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -10,7 +10,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -84,7 +84,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er return } - a.acquirings = mylo.Remove(a.acquirings, info) + a.acquirings = lo2.Remove(a.acquirings, info) if info.LastErr != nil { info.Callback.SetError(info.LastErr) } else { @@ -213,7 +213,7 @@ func (a *AcquireActor) doAcquiring() error { } req.Callback.SetValue(reqData.ID) - a.acquirings = mylo.RemoveAt(a.acquirings, i) + a.acquirings = lo2.RemoveAt(a.acquirings, i) break } diff --git a/pkgs/distlock/internal/service_info_actor.go b/pkgs/distlock/internal/service_info_actor.go index a940574..dcbac71 100644 --- a/pkgs/distlock/internal/service_info_actor.go +++ b/pkgs/distlock/internal/service_info_actor.go @@ -8,7 +8,7 @@ import ( "github.com/google/uuid" "gitlink.org.cn/cloudream/common/pkgs/logger" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/serder" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -191,6 +191,6 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) { if evt.IsLocking { status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID) } else { - status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID) + status.LockRequestIDs = lo2.Remove(status.LockRequestIDs, evt.Data.ID) } } diff --git a/pkgs/mq/client.go b/pkgs/mq/client.go index 078cdc2..8709347 100644 --- a/pkgs/mq/client.go +++ b/pkgs/mq/client.go @@ -300,7 +300,7 @@ func (c *RabbitMQTransport) Close() error { // 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来 func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) { - opt := RequestOption{Timeout: time.Second * 15} + opt := RequestOption{Timeout: time.Second * 15, KeepAlive: true} if len(opts) > 0 { opt = opts[0] } diff --git a/pkgs/task/manager.go b/pkgs/task/manager.go index 9f7b276..b925e18 100644 --- a/pkgs/task/manager.go +++ b/pkgs/task/manager.go @@ -5,7 +5,7 @@ import ( "sync" "time" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) type Manager[TCtx any] struct { @@ -108,12 +108,12 @@ func (m *Manager[TCtx]) executeTask(task *Task[TCtx]) { // 立刻删除任务,或者延迟一段时间再删除 if opt.RemovingDelay == 0 { - m.tasks = mylo.Remove(m.tasks, task) + m.tasks = lo2.Remove(m.tasks, task) } else { go func() { <-time.After(opt.RemovingDelay) m.lock.Lock() - m.tasks = mylo.Remove(m.tasks, task) + m.tasks = lo2.Remove(m.tasks, task) m.lock.Unlock() }() } diff --git a/pkgs/task/task.go b/pkgs/task/task.go index 7f6ff35..2210f86 100644 --- a/pkgs/task/task.go +++ b/pkgs/task/task.go @@ -5,7 +5,7 @@ import ( "sync/atomic" "time" - mylo "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" ) type CompleteOption struct { @@ -82,7 +82,7 @@ func (t *Task[TCtx]) WaitTimeout(timeout time.Duration) bool { select { case <-time.After(timeout): t.waiterLock.Lock() - t.waiters = mylo.Remove(t.waiters, waiter) + t.waiters = lo2.Remove(t.waiters, waiter) t.waiterLock.Unlock() return false diff --git a/sdks/storage/models.go b/sdks/storage/models.go index 9c60ce1..577479b 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -71,7 +71,7 @@ func (b *RepRedundancy) Value() (driver.Value, error) { return serder.ObjectToJSONEx[Redundancy](b) } -var DefaultECRedundancy = *NewECRedundancy(2, 3, 1024*1024*5) +var DefaultECRedundancy = *NewECRedundancy(3, 5, 1024*1024*5) type ECRedundancy struct { serder.Metadata `union:"ec"` diff --git a/utils/io/join.go b/utils/io/join.go index 09c294e..cbd1a37 100644 --- a/utils/io/join.go +++ b/utils/io/join.go @@ -3,7 +3,7 @@ package io import ( "io" - "gitlink.org.cn/cloudream/common/utils/lo" + "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/math" ) @@ -81,7 +81,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) { } if err == io.EOF { - s.inputs = lo.RemoveAt(s.inputs, s.currentInput) + s.inputs = lo2.RemoveAt(s.inputs, s.currentInput) // 此处不需要+1 if len(s.inputs) > 0 { s.currentInput = s.currentInput % len(s.inputs) diff --git a/utils/lo/lo.go b/utils/lo2/lo.go similarity index 96% rename from utils/lo/lo.go rename to utils/lo2/lo.go index 1852af3..f399ed7 100644 --- a/utils/lo/lo.go +++ b/utils/lo2/lo.go @@ -1,4 +1,4 @@ -package lo +package lo2 import "github.com/samber/lo" diff --git a/utils/lo/lo_test.go b/utils/lo2/lo_test.go similarity index 99% rename from utils/lo/lo_test.go rename to utils/lo2/lo_test.go index b6ff303..9182cb7 100644 --- a/utils/lo/lo_test.go +++ b/utils/lo2/lo_test.go @@ -1,4 +1,4 @@ -package lo +package lo2 import ( "testing" diff --git a/utils/sort/sort.go b/utils/sort2/sort.go similarity index 98% rename from utils/sort/sort.go rename to utils/sort2/sort.go index f469e0b..d594352 100644 --- a/utils/sort/sort.go +++ b/utils/sort2/sort.go @@ -1,4 +1,4 @@ -package sort +package sort2 import ( "sort" diff --git a/utils/time2/measurement.go b/utils/time2/measurement.go new file mode 100644 index 0000000..f5f77c9 --- /dev/null +++ b/utils/time2/measurement.go @@ -0,0 +1,113 @@ +package time2 + +import ( + "fmt" + "path" + "runtime" + "strings" + "time" +) + +type Measurement struct { + startTime time.Time + lastPointTime time.Time + printer func(string) + on bool + title string +} + +func NewMeasurement(printer func(string)) Measurement { + return Measurement{ + printer: printer, + } +} + +func (m *Measurement) Begin(on bool, title ...string) { + if m == nil { + return + } + + m.on = on + m.title = strings.Join(title, ".") + + if on { + m.startTime = time.Now() + m.lastPointTime = m.startTime + + _, file, line, ok := runtime.Caller(1) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf(":%s", m.title) + } + + if ok { + m.printer(fmt.Sprintf("[BEGIN%v]<%v:%v>", titlePart, path.Base(file), line)) + } else { + m.printer(fmt.Sprintf("[BEGIN%v]", titlePart)) + } + } +} + +func (m *Measurement) Point(desc ...string) { + if m == nil { + return + } + + if m.on { + m.printer(m.makePointString(strings.Join(desc, "."))) + } +} + +func (m *Measurement) makePointString(desc string) string { + last := m.lastPointTime + now := time.Now() + m.lastPointTime = now + + _, file, line, ok := runtime.Caller(2) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf("(%s)", m.title) + } + + if desc != "" { + desc = fmt.Sprintf("@%s", desc) + } + + if ok { + return fmt.Sprintf("%v {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line) + } + + return fmt.Sprintf("{%v/%v}%v", now.Sub(last), now.Sub(m.startTime), desc) +} + +func (m *Measurement) End(descs ...string) { + if m == nil { + return + } + + if m.on { + last := m.lastPointTime + now := time.Now() + m.lastPointTime = now + + _, file, line, ok := runtime.Caller(1) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf(":%s", m.title) + } + + desc := strings.Join(descs, ".") + if desc != "" { + desc = fmt.Sprintf("@%s", desc) + } + + if ok { + m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line)) + } else { + m.printer(fmt.Sprintf("[END%v] {%v/%v} %v", titlePart, now.Sub(last), now.Sub(m.startTime), desc)) + } + } +}