Browse Source

Merge pull request '修复综合测试过程中的问题' (#40) from feature_gxh into master

gitlink
Sydonian 1 year ago
parent
commit
04b361bd9a
11 changed files with 130 additions and 17 deletions
  1. +3
    -3
      pkgs/distlock/internal/acquire_actor.go
  2. +2
    -2
      pkgs/distlock/internal/service_info_actor.go
  3. +1
    -1
      pkgs/mq/client.go
  4. +3
    -3
      pkgs/task/manager.go
  5. +2
    -2
      pkgs/task/task.go
  6. +1
    -1
      sdks/storage/models.go
  7. +2
    -2
      utils/io/join.go
  8. +1
    -1
      utils/lo2/lo.go
  9. +1
    -1
      utils/lo2/lo_test.go
  10. +1
    -1
      utils/sort2/sort.go
  11. +113
    -0
      utils/time2/measurement.go

+ 3
- 3
pkgs/distlock/internal/acquire_actor.go View File

@@ -10,7 +10,7 @@ import (

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
@@ -84,7 +84,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er
return
}

a.acquirings = mylo.Remove(a.acquirings, info)
a.acquirings = lo2.Remove(a.acquirings, info)
if info.LastErr != nil {
info.Callback.SetError(info.LastErr)
} else {
@@ -213,7 +213,7 @@ func (a *AcquireActor) doAcquiring() error {
}

req.Callback.SetValue(reqData.ID)
a.acquirings = mylo.RemoveAt(a.acquirings, i)
a.acquirings = lo2.RemoveAt(a.acquirings, i)
break
}



+ 2
- 2
pkgs/distlock/internal/service_info_actor.go View File

@@ -8,7 +8,7 @@ import (

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/pkgs/logger"
mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder"
clientv3 "go.etcd.io/etcd/client/v3"
)
@@ -191,6 +191,6 @@ func (a *ServiceInfoActor) OnLockRequestEvent(evt LockRequestEvent) {
if evt.IsLocking {
status.LockRequestIDs = append(status.LockRequestIDs, evt.Data.ID)
} else {
status.LockRequestIDs = mylo.Remove(status.LockRequestIDs, evt.Data.ID)
status.LockRequestIDs = lo2.Remove(status.LockRequestIDs, evt.Data.ID)
}
}

+ 1
- 1
pkgs/mq/client.go View File

@@ -300,7 +300,7 @@ func (c *RabbitMQTransport) Close() error {

// 发送消息并等待回应。因为无法自动推断出TResp的类型,所以将其放在第一个手工填写,之后的TBody可以自动推断出来
func Request[TSvc any, TReq MessageBody, TResp MessageBody](_ func(svc TSvc, msg TReq) (TResp, *CodeMessage), cli RoundTripper, req TReq, opts ...RequestOption) (TResp, error) {
opt := RequestOption{Timeout: time.Second * 15}
opt := RequestOption{Timeout: time.Second * 15, KeepAlive: true}
if len(opts) > 0 {
opt = opts[0]
}


+ 3
- 3
pkgs/task/manager.go View File

@@ -5,7 +5,7 @@ import (
"sync"
"time"

mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type Manager[TCtx any] struct {
@@ -108,12 +108,12 @@ func (m *Manager[TCtx]) executeTask(task *Task[TCtx]) {

// 立刻删除任务,或者延迟一段时间再删除
if opt.RemovingDelay == 0 {
m.tasks = mylo.Remove(m.tasks, task)
m.tasks = lo2.Remove(m.tasks, task)
} else {
go func() {
<-time.After(opt.RemovingDelay)
m.lock.Lock()
m.tasks = mylo.Remove(m.tasks, task)
m.tasks = lo2.Remove(m.tasks, task)
m.lock.Unlock()
}()
}


+ 2
- 2
pkgs/task/task.go View File

@@ -5,7 +5,7 @@ import (
"sync/atomic"
"time"

mylo "gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
)

type CompleteOption struct {
@@ -82,7 +82,7 @@ func (t *Task[TCtx]) WaitTimeout(timeout time.Duration) bool {
select {
case <-time.After(timeout):
t.waiterLock.Lock()
t.waiters = mylo.Remove(t.waiters, waiter)
t.waiters = lo2.Remove(t.waiters, waiter)
t.waiterLock.Unlock()

return false


+ 1
- 1
sdks/storage/models.go View File

@@ -71,7 +71,7 @@ func (b *RepRedundancy) Value() (driver.Value, error) {
return serder.ObjectToJSONEx[Redundancy](b)
}

var DefaultECRedundancy = *NewECRedundancy(2, 3, 1024*1024*5)
var DefaultECRedundancy = *NewECRedundancy(3, 5, 1024*1024*5)

type ECRedundancy struct {
serder.Metadata `union:"ec"`


+ 2
- 2
utils/io/join.go View File

@@ -3,7 +3,7 @@ package io
import (
"io"

"gitlink.org.cn/cloudream/common/utils/lo"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/math"
)

@@ -81,7 +81,7 @@ func (s *chunkedJoin) Read(buf []byte) (int, error) {
}

if err == io.EOF {
s.inputs = lo.RemoveAt(s.inputs, s.currentInput)
s.inputs = lo2.RemoveAt(s.inputs, s.currentInput)
// 此处不需要+1
if len(s.inputs) > 0 {
s.currentInput = s.currentInput % len(s.inputs)


utils/lo/lo.go → utils/lo2/lo.go View File

@@ -1,4 +1,4 @@
package lo
package lo2

import "github.com/samber/lo"


utils/lo/lo_test.go → utils/lo2/lo_test.go View File

@@ -1,4 +1,4 @@
package lo
package lo2

import (
"testing"

utils/sort/sort.go → utils/sort2/sort.go View File

@@ -1,4 +1,4 @@
package sort
package sort2

import (
"sort"

+ 113
- 0
utils/time2/measurement.go View File

@@ -0,0 +1,113 @@
package time2

import (
"fmt"
"path"
"runtime"
"strings"
"time"
)

type Measurement struct {
startTime time.Time
lastPointTime time.Time
printer func(string)
on bool
title string
}

func NewMeasurement(printer func(string)) Measurement {
return Measurement{
printer: printer,
}
}

func (m *Measurement) Begin(on bool, title ...string) {
if m == nil {
return
}

m.on = on
m.title = strings.Join(title, ".")

if on {
m.startTime = time.Now()
m.lastPointTime = m.startTime

_, file, line, ok := runtime.Caller(1)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf(":%s", m.title)
}

if ok {
m.printer(fmt.Sprintf("[BEGIN%v]<%v:%v>", titlePart, path.Base(file), line))
} else {
m.printer(fmt.Sprintf("[BEGIN%v]<UNKNOWN>", titlePart))
}
}
}

func (m *Measurement) Point(desc ...string) {
if m == nil {
return
}

if m.on {
m.printer(m.makePointString(strings.Join(desc, ".")))
}
}

func (m *Measurement) makePointString(desc string) string {
last := m.lastPointTime
now := time.Now()
m.lastPointTime = now

_, file, line, ok := runtime.Caller(2)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf("(%s)", m.title)
}

if desc != "" {
desc = fmt.Sprintf("@%s", desc)
}

if ok {
return fmt.Sprintf("%v {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line)
}

return fmt.Sprintf("{%v/%v}%v<UNKNOWN>", now.Sub(last), now.Sub(m.startTime), desc)
}

func (m *Measurement) End(descs ...string) {
if m == nil {
return
}

if m.on {
last := m.lastPointTime
now := time.Now()
m.lastPointTime = now

_, file, line, ok := runtime.Caller(1)

titlePart := ""
if m.title != "" {
titlePart = fmt.Sprintf(":%s", m.title)
}

desc := strings.Join(descs, ".")
if desc != "" {
desc = fmt.Sprintf("@%s", desc)
}

if ok {
m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<%v:%v>", titlePart, now.Sub(last), now.Sub(m.startTime), desc, path.Base(file), line))
} else {
m.printer(fmt.Sprintf("[END%v] {%v/%v} %v<UNKNOWN>", titlePart, now.Sub(last), now.Sub(m.startTime), desc))
}
}
}

Loading…
Cancel
Save