Browse Source

redis to lock tasks

tags/v1.22.9.2^2
liuzx 3 years ago
parent
commit
6896276b44
4 changed files with 28 additions and 8 deletions
  1. +3
    -2
      modules/auth/wechat/access_token.go
  2. +7
    -0
      modules/redis/redis_key/cloudbrain_redis_key.go
  3. +7
    -6
      modules/redis/redis_lock/lock.go
  4. +11
    -0
      routers/repo/modelarts.go

+ 3
- 2
modules/auth/wechat/access_token.go View File

@@ -1,10 +1,11 @@
package wechat

import (
"time"

"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"time"
)

const EMPTY_REDIS_VAL = "Nil"
@@ -35,7 +36,7 @@ func refreshAccessToken() {
}

func refreshAndGetAccessToken() string {
if ok := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second); ok {
if ok, _ := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second); ok {
defer accessTokenLock.UnLock()
token, _ := redis_client.Get(redis_key.WechatAccessTokenKey())
if token != "" {


+ 7
- 0
modules/redis/redis_key/cloudbrain_redis_key.go View File

@@ -0,0 +1,7 @@
package redis_key

const CLOUDBRAIN_PREFIX = "cloudbrain"

func CloudbrainBindingJobNameKey(repoId string, jobType string, jobName string) string {
return KeyJoin(CLOUDBRAIN_PREFIX, repoId, jobType, jobName, "redis_key")
}

+ 7
- 6
modules/redis/redis_lock/lock.go View File

@@ -1,8 +1,9 @@
package redis_lock

import (
"code.gitea.io/gitea/modules/redis/redis_client"
"time"

"code.gitea.io/gitea/modules/redis/redis_client"
)

type DistributeLock struct {
@@ -18,21 +19,21 @@ func (lock *DistributeLock) Lock(expireTime time.Duration) bool {
return isOk
}

func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) bool {
func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) (bool, error) {
start := time.Now().Unix() * 1000
duration := waitTime.Milliseconds()
for {
isOk, _ := redis_client.Setnx(lock.lockKey, "", expireTime)
isOk, err := redis_client.Setnx(lock.lockKey, "", expireTime)
if isOk {
return true
return true, nil
}
if time.Now().Unix()*1000-start > duration {
return false
return false, err
}
time.Sleep(50 * time.Millisecond)
}

return false
return false, nil
}

func (lock *DistributeLock) UnLock() error {


+ 11
- 0
routers/repo/modelarts.go View File

@@ -25,6 +25,8 @@ import (
"code.gitea.io/gitea/modules/modelarts"
"code.gitea.io/gitea/modules/notification"
"code.gitea.io/gitea/modules/obs"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/timeutil"
@@ -1090,6 +1092,15 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
VersionCount := modelarts.VersionCountOne
EngineName := form.EngineName

lock := redis_lock.NewDistributeLock(redis_key.CloudbrainBindingJobNameKey(fmt.Sprint(repo.ID), string(models.JobTypeTrain), displayJobName))
isOk, err := lock.LockWithWait(60*time.Second, 60*time.Second)
if !isOk {
log.Error("The Task have been process:%v", err, ctx.Data["MsgID"])
trainJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr("system error", tplModelArtsTrainJobNew, &form)
return
}

count, err := models.GetCloudbrainTrainJobCountByUserID(ctx.User.ID)
if err != nil {
log.Error("GetCloudbrainTrainJobCountByUserID failed:%v", err, ctx.Data["MsgID"])


Loading…
Cancel
Save