| @@ -0,0 +1,32 @@ | |||
| package actor | |||
| import ( | |||
| "testing" | |||
| "time" | |||
| . "github.com/smartystreets/goconvey/convey" | |||
| ) | |||
| func Test_CommandChannel(t *testing.T) { | |||
| wait := func(ch <-chan CommandFn) (CommandFn, bool) { | |||
| select { | |||
| case <-time.After(time.Second * 5): | |||
| return nil, false | |||
| case cmd := <-ch: | |||
| return cmd, true | |||
| } | |||
| } | |||
| Convey("BeginChanReceive", t, func() { | |||
| cmdChan := NewCommandChannel() | |||
| cmdChan.Send(func() {}) | |||
| ch := cmdChan.BeginChanReceive() | |||
| defer cmdChan.CloseChanReceive() | |||
| _, ok := wait(ch) | |||
| So(ok, ShouldBeTrue) | |||
| }) | |||
| } | |||
| @@ -42,7 +42,6 @@ type MainActor struct { | |||
| commandChan *actor.CommandChannel | |||
| watchEtcdActor *WatchEtcdActor | |||
| providersActor *ProvidersActor | |||
| lockRequestLeaseID clientv3.LeaseID | |||
| @@ -56,8 +55,7 @@ func NewMainActor(cfg *distlock.Config, etcdCli *clientv3.Client) *MainActor { | |||
| } | |||
| } | |||
| func (a *MainActor) Init(watchEtcdActor *WatchEtcdActor, providersActor *ProvidersActor) { | |||
| a.watchEtcdActor = watchEtcdActor | |||
| func (a *MainActor) Init(providersActor *ProvidersActor) { | |||
| a.providersActor = providersActor | |||
| } | |||
| @@ -69,7 +67,7 @@ func (a *MainActor) Acquire(req distlock.LockRequest) (reqID string, err error) | |||
| } | |||
| if rets[0].Err != nil { | |||
| return "", err | |||
| return "", rets[0].Err | |||
| } | |||
| return rets[0].RequestID, nil | |||
| @@ -145,7 +143,9 @@ func (a *MainActor) submitLockRequest(reqData LockRequestData) error { | |||
| etcdOps = []clientv3.Op{ | |||
| clientv3.OpPut(LOCK_REQUEST_INDEX, reqData.ID), | |||
| // 归属到当前连接的租约,在当前连接断开后,能自动解锁 | |||
| clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes), clientv3.WithLease(a.lockRequestLeaseID)), | |||
| // TODO 不能直接给RequestData上租约,因为如果在别的服务已经获取到锁的情况下, | |||
| // 如果当前服务崩溃,删除消息会立刻发送出去,这就破坏了锁的约定(在锁定期间其他服务不能修改数据) | |||
| clientv3.OpPut(makeEtcdLockRequestKey(reqData.ID), string(reqBytes)), //, clientv3.WithLease(a.lockRequestLeaseID)), | |||
| } | |||
| } | |||
| txResp, err := a.etcdCli.Txn(context.Background()).Then(etcdOps...).Commit() | |||
| @@ -287,23 +287,11 @@ func (a *MainActor) ReloadEtcdData() error { | |||
| reqData = append(reqData, req) | |||
| } | |||
| // 先停止监听,再重置锁状态,最后恢复监听 | |||
| err = a.watchEtcdActor.StopWatching() | |||
| if err != nil { | |||
| return fmt.Errorf("stop watching etcd failed, err: %w", err) | |||
| } | |||
| err = a.providersActor.ResetState(index, reqData) | |||
| if err != nil { | |||
| return fmt.Errorf("reset lock providers state failed, err: %w", err) | |||
| } | |||
| err = a.watchEtcdActor.StartWatching() | |||
| if err != nil { | |||
| return fmt.Errorf("start watching etcd failed, err: %w", err) | |||
| } | |||
| return nil | |||
| }) | |||
| } | |||
| @@ -8,6 +8,7 @@ import ( | |||
| "gitlink.org.cn/cloudream/common/pkg/actor" | |||
| "gitlink.org.cn/cloudream/common/pkg/distlock" | |||
| "gitlink.org.cn/cloudream/common/pkg/future" | |||
| "gitlink.org.cn/cloudream/common/pkg/logger" | |||
| mylo "gitlink.org.cn/cloudream/common/utils/lo" | |||
| ) | |||
| @@ -35,7 +36,7 @@ func (a *RetryActor) Init(mainActor *MainActor) { | |||
| a.mainActor = mainActor | |||
| } | |||
| func (a *RetryActor) Retry(req distlock.LockRequest, timeout time.Duration, lastErr error) (*future.SetValueFuture[string], error) { | |||
| func (a *RetryActor) Retry(req distlock.LockRequest, timeout time.Duration, lastErr error) (future.ValueFuture[string], error) { | |||
| fut := future.NewSetValue[string]() | |||
| var info *retryInfo | |||
| @@ -76,8 +77,14 @@ func (a *RetryActor) Retry(req distlock.LockRequest, timeout time.Duration, last | |||
| func (a *RetryActor) OnLocalStateUpdated() { | |||
| a.commandChan.Send(func() { | |||
| if len(a.retrys) == 0 { | |||
| return | |||
| } | |||
| rets, err := a.mainActor.AcquireMany(a.retrys) | |||
| if err != nil { | |||
| // TODO 处理错误 | |||
| logger.Debugf("acquire many lock requests failed, err: %s", err.Error()) | |||
| return | |||
| } | |||
| @@ -45,7 +45,7 @@ func NewService(cfg *distlock.Config) (*Service, error) { | |||
| leaseActor := internal.NewLeaseActor() | |||
| retryActor := internal.NewRetryActor() | |||
| mainActor.Init(watchEtcdActor, providersActor) | |||
| mainActor.Init(providersActor) | |||
| providersActor.Init() | |||
| watchEtcdActor.Init() | |||
| leaseActor.Init(mainActor) | |||
| @@ -112,6 +112,11 @@ func (svc *Service) Release(reqID string) error { | |||
| } | |||
| func (svc *Service) Serve() error { | |||
| // TODO 需要停止service的方法 | |||
| // 目前已知问题: | |||
| // 1. client退出时直接中断进程,此时RetryActor可能正在进行Retry,于是导致Etcd锁没有解除就退出了进程。 | |||
| // 虽然由于租约的存在不会导致系统长期卡死,但会影响client的使用 | |||
| go func() { | |||
| // TODO 处理错误 | |||
| err := svc.providersActor.Serve() | |||
| @@ -160,6 +165,7 @@ func (svc *Service) Serve() error { | |||
| svc.lockReqEventWatcher.OnEvent = func(events []internal.LockRequestEvent) { | |||
| svc.providersActor.ApplyLockRequestEvents(events) | |||
| svc.retryActor.OnLocalStateUpdated() | |||
| } | |||
| err = svc.watchEtcdActor.AddEventWatcher(&svc.lockReqEventWatcher) | |||
| if err != nil { | |||
| @@ -16,7 +16,7 @@ func NewCounterCond(initCount int) *CounterCond { | |||
| func (c *CounterCond) Wait() bool { | |||
| c.cond.L.Lock() | |||
| defer c.cond.L.Lock() | |||
| defer c.cond.L.Unlock() | |||
| for c.count == 0 { | |||
| c.cond.Wait() | |||