From 44586c567ba4842335b006559416bc1cb610a6a5 Mon Sep 17 00:00:00 2001 From: chenyifan01 Date: Tue, 28 Jun 2022 18:14:23 +0800 Subject: [PATCH] #2225 update --- models/action.go | 15 ++ models/cloudbrain.go | 110 ++++++++++++- models/reward_admin_log.go | 15 ++ models/reward_operate_record.go | 147 +++++++++++++++--- models/reward_periodic_task.go | 2 +- models/task_accomplish_log.go | 1 - modules/cron/tasks_basic.go | 2 +- routers/repo/cloudbrain.go | 10 +- routers/repo/modelarts.go | 10 +- routers/reward/point/point.go | 1 - services/reward/cloubrain_deduct.go | 82 ++++------ services/reward/operator.go | 51 +++--- services/reward/period_task.go | 33 ++-- .../reward/point/account/point_account.go | 18 +++ services/reward/record.go | 18 ++- services/task/task.go | 7 +- 16 files changed, 386 insertions(+), 136 deletions(-) diff --git a/models/action.go b/models/action.go index 456d5c6bc..ff16dcd3f 100755 --- a/models/action.go +++ b/models/action.go @@ -412,3 +412,18 @@ func GetUnTransformedActions() ([]*Action, error) { Find(&actions) return actions, err } + +func GetActionByIds(ids []int64) ([]*Action, error) { + if len(ids) == 0 { + return nil, nil + } + actions := make([]*Action, 0) + err := x.In("id", ids).Find(&actions) + if err != nil { + return nil, err + } + if err := ActionList(actions).LoadAttributes(); err != nil { + return nil, fmt.Errorf("ActionList loadAttributes: %v", err) + } + return actions, nil +} diff --git a/models/cloudbrain.go b/models/cloudbrain.go index 33b85de20..06cd42258 100755 --- a/models/cloudbrain.go +++ b/models/cloudbrain.go @@ -168,6 +168,72 @@ type Cloudbrain struct { EndTime timeutil.TimeStamp } +type CloudbrainShow struct { + JobID string `xorm:"INDEX NOT NULL"` + JobType string `xorm:"INDEX NOT NULL DEFAULT 'DEBUG'"` + JobName string + DisplayJobName string + Status string + UserID int64 `xorm:"INDEX NOT NULL"` + RepoID int64 `xorm:"INDEX NOT NULL"` + SubTaskName string + ContainerID string + ContainerIp string + CreatedUnix timeutil.TimeStamp `xorm:"INDEX"` + UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` + Duration int64 `xorm:"DEFAULT 0"` //运行时长 单位秒 + TrainJobDuration string `xorm:"DEFAULT '00:00:00'"` + Image string //镜像名称 + GpuQueue string //GPU类型即GPU队列 + ResourceSpecId int //GPU规格id + DeletedAt time.Time `xorm:"deleted"` + CanDebug bool `xorm:"-"` + CanDel bool `xorm:"-"` + CanModify bool `xorm:"-"` + Type int + BenchmarkTypeID int + BenchmarkChildTypeID int + + VersionID int64 //版本id + VersionName string `xorm:"INDEX"` //当前版本 + Uuid string //数据集id + DatasetName string + VersionCount int //任务的当前版本数量,不包括删除的 + IsLatestVersion string //是否是最新版本,1是,0否 + CommitID string //提交的仓库代码id + PreVersionName string //父版本名称 + ComputeResource string //计算资源,例如npu + EngineID int64 //引擎id + + TrainUrl string //输出模型的obs路径 + BranchName string //分支名称 + Parameters string //传给modelarts的param参数 + BootFile string //启动文件 + DataUrl string //数据集的obs路径 + LogUrl string //日志输出的obs路径 + PreVersionId int64 //父版本的版本id + FlavorCode string //modelarts上的规格id + Description string `xorm:"varchar(256)"` //描述 + WorkServerNumber int //节点数 + FlavorName string //规格名称 + EngineName string //引擎名称 + TotalVersionCount int //任务的所有版本数量,包括删除的 + + LabelName string //标签名称 + ModelName string //模型名称 + ModelVersion string //模型版本 + CkptName string //权重文件名称 + ResultUrl string //推理结果的obs路径 + + User *User `xorm:"-"` + Repo *Repository `xorm:"-"` + BenchmarkType string `xorm:"-"` //算法评测,模型评测 + BenchmarkTypeName string `xorm:"-"` + BenchmarkTypeRankLink string `xorm:"-"` + StartTime timeutil.TimeStamp + EndTime timeutil.TimeStamp +} + func (task *Cloudbrain) ComputeAndSetDuration() { var d int64 if task.StartTime == 0 { @@ -1844,9 +1910,51 @@ func CloudbrainAllStatic(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int64, er func GetStartedCloudbrainTaskByUpdatedUnix(startTime, endTime time.Time) ([]Cloudbrain, error) { r := make([]Cloudbrain, 0) - err := x.Where("updated_unix >= ? and updated_unix <= ? and start_time > 0", startTime.Unix(), endTime.Unix()).Find(&r) + err := x.Where("updated_unix >= ? and updated_unix <= ? and start_time > 0", startTime.Unix(), endTime.Unix()).Unscoped().Find(&r) if err != nil { return nil, err } return r, nil } + +func GetCloudbrainByIds(ids []int64) ([]Cloudbrain, error) { + if len(ids) == 0 { + return nil, nil + } + cloudbrains := make([]Cloudbrain, 0) + err := x.In("id", ids).Unscoped().Find(&cloudbrains) + if err != nil { + return nil, err + } + return cloudbrains, nil +} + +var ( + DebugResourceSpecs *ResourceSpecs + TrainResourceSpecs *ResourceSpecs +) + +func GetResourceSpec(jobType string, resourceSpecId int) *ResourceSpec { + if jobType == string(JobTypeTrain) { + if TrainResourceSpecs == nil { + json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs) + } + for _, spec := range TrainResourceSpecs.ResourceSpec { + if resourceSpecId == spec.Id { + return spec + } + } + } else { + if DebugResourceSpecs == nil { + json.Unmarshal([]byte(setting.ResourceSpecs), &DebugResourceSpecs) + } + for _, spec := range DebugResourceSpecs.ResourceSpec { + if resourceSpecId == spec.Id { + return spec + } + } + + } + return nil + +} diff --git a/models/reward_admin_log.go b/models/reward_admin_log.go index 5e4258682..b1a55af13 100644 --- a/models/reward_admin_log.go +++ b/models/reward_admin_log.go @@ -2,6 +2,7 @@ package models import ( "code.gitea.io/gitea/modules/timeutil" + "strings" ) const ( @@ -44,3 +45,17 @@ func UpdateRewardAdminLogStatus(logId string, oldStatus, newStatus int) error { } return nil } + +func GetRewardAdminLogByLogIds(logIds []string) ([]RewardAdminLog, error) { + if len(logIds) == 0 { + return nil, nil + } + adminLogs := make([]RewardAdminLog, 0) + err := x.SQL("select rdl.id,rdl.log_id,rdl.amount,rdl.reward_type,rdl.remark,rdl.status,rdl.target_user_id,rdl.creator_id,u.name as creator_name "+ + "from reward_admin_log rdl left join public.user u on rdl.creator_id = u.id "+ + "where rdl.log_id in (?)", strings.Join(logIds, ",")).Find(&adminLogs) + if err != nil { + return nil, err + } + return adminLogs, nil +} diff --git a/models/reward_operate_record.go b/models/reward_operate_record.go index 394fba1cf..04f43a8bd 100644 --- a/models/reward_operate_record.go +++ b/models/reward_operate_record.go @@ -2,6 +2,8 @@ package models import ( "code.gitea.io/gitea/modules/timeutil" + "fmt" + "strconv" "strings" "xorm.io/builder" ) @@ -112,21 +114,120 @@ const ( RewardOrderByIDDesc RewardOperateOrderBy = "id desc" ) +type RewardRecordList []*RewardOperateRecord +type RewardRecordShowList []*RewardOperateRecordShow + +func (l *RewardRecordList) ToShow() (RewardRecordShowList, error) { + actionMap, err := l.GetRewardRecordAction() + adminLogMap, err := l.GetRewardRecordAdminLog() + CloudbrainMap, err := l.GetRewardRecordCloudbrainTask() + if err != nil { + return nil, err + } + result := make([]*RewardOperateRecordShow, 0) + for _, v := range *l { + temp := v.ToShow() + switch v.SourceType { + case SourceTypeAccomplishTask.Name(): + temp.Action = actionMap[v.SourceId] + case SourceTypeAdminOperate.Name(): + temp.AdminLog = adminLogMap[v.SourceId] + case SourceTypeRunCloudbrainTask.Name(): + temp.Cloudbrain = CloudbrainMap[v.SourceId] + } + result = append(result, &temp) + } + + return result, nil +} + +func (l *RewardRecordList) GetRewardRecordAction() (map[string]Action, error) { + if len(*l) == 0 { + return nil, nil + } + actionIds := make([]int64, 0) + for _, r := range *l { + if r.SourceType != SourceTypeAccomplishTask.Name() { + continue + } + i, _ := strconv.ParseInt(r.SourceId, 10, 64) + actionIds = append(actionIds, i) + } + actions, err := GetActionByIds(actionIds) + if err != nil { + return nil, err + } + result := make(map[string]Action, 0) + for _, v := range actions { + result[fmt.Sprint(v.ID)] = *v + } + return result, nil + +} + +func (l *RewardRecordList) GetRewardRecordAdminLog() (map[string]RewardAdminLog, error) { + if len(*l) == 0 { + return nil, nil + } + logIds := make([]string, 0) + for _, r := range *l { + if r.SourceType != SourceTypeAdminOperate.Name() { + continue + } + logIds = append(logIds, r.SourceId) + } + logs, err := GetRewardAdminLogByLogIds(logIds) + if err != nil { + return nil, err + } + result := make(map[string]RewardAdminLog, 0) + for _, v := range logs { + result[fmt.Sprint(v.LogId)] = v + } + return result, nil + +} + +func (l *RewardRecordList) GetRewardRecordCloudbrainTask() (map[string]Cloudbrain, error) { + if len(*l) == 0 { + return nil, nil + } + cloudbrainIds := make([]int64, 0) + for _, r := range *l { + if r.SourceType != SourceTypeRunCloudbrainTask.Name() { + continue + } + i, _ := strconv.ParseInt(r.SourceId, 10, 64) + cloudbrainIds = append(cloudbrainIds, i) + } + cloudbrains, err := GetCloudbrainByIds(cloudbrainIds) + if err != nil { + return nil, err + } + result := make(map[string]Cloudbrain, 0) + for _, v := range cloudbrains { + result[fmt.Sprint(v.ID)] = v + } + return result, nil + +} + type RewardOperateRecord struct { - ID int64 `xorm:"pk autoincr"` - SerialNo string `xorm:"INDEX NOT NULL"` - UserId int64 `xorm:"INDEX NOT NULL"` - Amount int64 `xorm:"NOT NULL"` - Tittle string - RewardType string `xorm:"NOT NULL"` - SourceType string `xorm:"NOT NULL"` - SourceId string `xorm:"INDEX NOT NULL"` - RequestId string `xorm:"INDEX NOT NULL"` - OperateType string `xorm:"NOT NULL"` - Status string `xorm:"NOT NULL"` - Remark string - CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` - UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` + ID int64 `xorm:"pk autoincr"` + SerialNo string `xorm:"INDEX NOT NULL"` + UserId int64 `xorm:"INDEX NOT NULL"` + Amount int64 `xorm:"NOT NULL"` + Tittle string + RewardType string `xorm:"NOT NULL"` + SourceType string `xorm:"NOT NULL"` + SourceId string `xorm:"INDEX NOT NULL"` + RequestId string `xorm:"INDEX NOT NULL"` + OperateType string `xorm:"NOT NULL"` + Status string `xorm:"NOT NULL"` + Remark string + CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"` + UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"` + FinishedUnix timeutil.TimeStamp `xorm:"INDEX"` } type AdminRewardOperateReq struct { @@ -144,6 +245,8 @@ func (r RewardOperateRecord) ToShow() RewardOperateRecordShow { OperateType: r.OperateType, Amount: r.Amount, Remark: r.Remark, + Status: r.Status, + SourceType: r.SourceType, } } @@ -153,10 +256,11 @@ type RewardOperateRecordShow struct { Status string OperateType string Amount int64 + Remark string + SourceType string Action Action Cloudbrain Cloudbrain - SourceType SourceType - Remark string + AdminLog RewardAdminLog } func getPointOperateRecord(tl *RewardOperateRecord) (*RewardOperateRecord, error) { @@ -189,11 +293,12 @@ func InsertRewardOperateRecord(tl *RewardOperateRecord) (int64, error) { return x.Insert(tl) } -func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) (int64, error) { +func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) (int64, error) { r := &RewardOperateRecord{ - Status: newStatus, + Status: newStatus, + FinishedUnix: timeutil.TimeStampNow(), } - return x.Cols("status").Where("source_type=? and request_id=? and status=?", sourceType, requestId, oldStatus).Update(r) + return x.Cols("status", "finished_unix").Where("source_type=? and request_id=? and status=?", sourceType, requestId, OperateStatusOperating).Update(r) } func SumRewardAmountInTaskPeriod(rewardType string, sourceType string, userId int64, period *PeriodResult) (int64, error) { @@ -252,7 +357,7 @@ type RewardRecordListOpts struct { OrderBy RewardOperateOrderBy } -func GetRewardRecordList(opts RewardRecordListOpts) ([]RewardOperateRecord, int64, error) { +func GetRewardRecordList(opts RewardRecordListOpts) (RewardRecordList, int64, error) { if opts.Page <= 0 { opts.Page = 1 } @@ -261,7 +366,7 @@ func GetRewardRecordList(opts RewardRecordListOpts) ([]RewardOperateRecord, int6 opts.OrderBy = RewardOrderByIDDesc } - r := make([]RewardOperateRecord, 0) + r := make([]*RewardOperateRecord, 0) cond := builder.NewCond() if opts.UserId > 0 { cond = cond.And(builder.Eq{"user_id": opts.UserId}) diff --git a/models/reward_periodic_task.go b/models/reward_periodic_task.go index 5db5301b5..a859676d6 100644 --- a/models/reward_periodic_task.go +++ b/models/reward_periodic_task.go @@ -77,7 +77,7 @@ func IncrRewardTaskSuccessCount(t RewardPeriodicTask, count int64, nextTime time sess.Rollback() return err } - _, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where serial_no = ?", count*t.Amount, timeutil.TimeStampNow(), t.OperateSerialNo) + _, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where serial_no = ?", t.Amount, timeutil.TimeStampNow(), t.OperateSerialNo) if err != nil { sess.Rollback() return err diff --git a/models/task_accomplish_log.go b/models/task_accomplish_log.go index a1edb71ee..75494bfa2 100644 --- a/models/task_accomplish_log.go +++ b/models/task_accomplish_log.go @@ -7,7 +7,6 @@ import ( type TaskAccomplishLog struct { ID int64 `xorm:"pk autoincr"` - LogId string `xorm:"INDEX NOT NULL"` ConfigId int64 `xorm:"NOT NULL"` TaskCode string `xorm:"NOT NULL"` UserId int64 `xorm:"INDEX NOT NULL"` diff --git a/modules/cron/tasks_basic.go b/modules/cron/tasks_basic.go index 5892699eb..7d6c7df33 100755 --- a/modules/cron/tasks_basic.go +++ b/modules/cron/tasks_basic.go @@ -251,6 +251,6 @@ func initBasicTasks() { registerSyncCloudbrainStatus() registerHandleOrgStatistic() - registerRewardPeriodTask() + //registerRewardPeriodTask() registerCloudbrainPointDeductTask() } diff --git a/routers/repo/cloudbrain.go b/routers/repo/cloudbrain.go index f628a6f0a..29c8b97bb 100755 --- a/routers/repo/cloudbrain.go +++ b/routers/repo/cloudbrain.go @@ -2,7 +2,7 @@ package repo import ( "bufio" - "code.gitea.io/gitea/services/reward" + "code.gitea.io/gitea/services/reward/point/account" "encoding/json" "errors" "fmt" @@ -230,7 +230,7 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) { command = commandTrain } - if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form) @@ -318,7 +318,7 @@ func CloudBrainRestart(ctx *context.Context) { var status = string(models.JobWaiting) task := ctx.Cloudbrain for { - if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId) resultCode = "-1" errorMsg = models.ErrInsufficientPointsBalance{}.Error() @@ -1870,7 +1870,7 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo repo := ctx.Repo.Repository - if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplCloudBrainBenchmarkNew, &form) @@ -2032,7 +2032,7 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm) tpl := tplCloudBrainBenchmarkNew command := cloudbrain.Command - if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form) diff --git a/routers/repo/modelarts.go b/routers/repo/modelarts.go index 1fbf6c622..bff9ec525 100755 --- a/routers/repo/modelarts.go +++ b/routers/repo/modelarts.go @@ -2,7 +2,7 @@ package repo import ( "archive/zip" - "code.gitea.io/gitea/services/reward" + "code.gitea.io/gitea/services/reward/point/account" "encoding/json" "errors" "fmt" @@ -207,7 +207,7 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm repo := ctx.Repo.Repository resourceSpecId := form.ResourceSpecId - if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeDebug), resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeDebug), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsNotebookNew, &form) @@ -426,7 +426,7 @@ func NotebookManage(ctx *context.Context) { errorMsg = "you have no right to restart the job" break } - if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId) resultCode = "-1" errorMsg = models.ErrInsufficientPointsBalance{}.Error() @@ -1002,7 +1002,7 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm) EngineName := form.EngineName resourceSpecId := form.ResourceSpecId - if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeTrain), resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeTrain), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) cloudBrainNewDataPrepare(ctx) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsTrainJobNew, &form) @@ -1851,7 +1851,7 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference ckptUrl := form.TrainUrl + form.CkptName - if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeInference), resourceSpecId) { + if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeInference), resourceSpecId) { log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) inferenceJobErrorNewDataPrepare(ctx, form) ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsInferenceJobNew, &form) diff --git a/routers/reward/point/point.go b/routers/reward/point/point.go index edf41cd72..fa5e31afa 100644 --- a/routers/reward/point/point.go +++ b/routers/reward/point/point.go @@ -13,7 +13,6 @@ import ( const tplPoint base.TplName = "/reward/point" type AccountResponse struct { - AccountCode string Balance int64 TotalEarned int64 TotalConsumed int64 diff --git a/services/reward/cloubrain_deduct.go b/services/reward/cloubrain_deduct.go index ce23e2dc7..1e547a8a1 100644 --- a/services/reward/cloubrain_deduct.go +++ b/services/reward/cloubrain_deduct.go @@ -4,8 +4,6 @@ import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" - "code.gitea.io/gitea/services/reward/point/account" - "encoding/json" "fmt" "time" ) @@ -17,34 +15,17 @@ var ( const RUN_CLOUDBRAIN_TASK_TITTLE = "运行云脑任务" -//IsPointBalanceEnough check whether the user's point balance is bigger than task unit price -func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int) bool { +func StartAndGetCloudBrainPointDeductTask(task models.Cloudbrain) (*models.RewardPeriodicTask, error) { if !setting.CloudBrainPaySwitch { - return true - } - spec := getResourceSpec(jobType, resourceSpecId) - if spec == nil { - return true - } - a, error := account.GetAccount(targetUserId) - if error != nil { - return false - } - return a.Balance >= spec.UnitPrice - -} - -func StartCloudBrainPointDeductTask(task models.Cloudbrain) { - if !setting.CloudBrainPaySwitch { - return + return nil, nil } - spec := getResourceSpec(task.JobType, task.ResourceSpecId) + spec := models.GetResourceSpec(task.JobType, task.ResourceSpecId) if spec == nil || spec.UnitPrice == 0 { - return + return nil, nil } - StartPeriodicTask(&models.StartPeriodicTaskOpts{ + return StartAndGetPeriodicTask(&models.StartPeriodicTaskOpts{ SourceType: models.SourceTypeRunCloudbrainTask, SourceId: getCloudBrainPointTaskSourceId(task), TargetUserId: task.UserID, @@ -67,31 +48,6 @@ func getCloudBrainPointTaskSourceId(task models.Cloudbrain) string { return fmt.Sprint(task.ID) } -func getResourceSpec(jobType string, resourceSpecId int) *models.ResourceSpec { - if jobType == string(models.JobTypeTrain) { - if TrainResourceSpecs == nil { - json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs) - } - for _, spec := range TrainResourceSpecs.ResourceSpec { - if resourceSpecId == spec.Id { - return spec - } - } - } else { - if ResourceSpecs == nil { - json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs) - } - for _, spec := range ResourceSpecs.ResourceSpec { - if resourceSpecId == spec.Id { - return spec - } - } - - } - return nil - -} - var firstTimeFlag = true func StartCloudbrainPointDeductTask() { @@ -107,10 +63,9 @@ func StartCloudbrainPointDeductTask() { if firstTimeFlag { //When it is executed for the first time, it needs to process the tasks of the last 1 hours. //This is done to prevent the application from hanging for a long time - start = end.Add(-1 * time.Hour) + start = end.Add(-3 * time.Hour) firstTimeFlag = false } - taskList, err := models.GetStartedCloudbrainTaskByUpdatedUnix(start, end) if err != nil { log.Error("GetStartedCloudbrainTaskByUpdatedUnix error. %v", err) @@ -121,11 +76,30 @@ func StartCloudbrainPointDeductTask() { return } for _, t := range taskList { - if int64(t.StartTime) <= end.Unix() && int64(t.StartTime) >= start.Unix() { - StartCloudBrainPointDeductTask(t) + //初始化 period_task 和 operate_record + if int64(t.StartTime) > end.Unix() || int64(t.StartTime) < start.Unix() { + continue + } + + task, err := StartAndGetCloudBrainPointDeductTask(t) + if err != nil { + log.Error("run cloubrain point deduct task error,err=%v", err) + continue } + if task == nil { + continue + } + if task.Status == models.PeriodicTaskStatusFinished { + log.Info("Periodic task is finished") + continue + } + if int64(t.EndTime) <= end.Unix() && int64(t.EndTime) >= start.Unix() { - StopCloudBrainPointDeductTask(t) + endTime := time.Unix(int64(t.EndTime), 0) + RunRewardTask(*task, endTime) + models.StopPeriodicTask(task.ID, task.OperateSerialNo, endTime) + } else { + RunRewardTask(*task, end) } } } diff --git a/services/reward/operator.go b/services/reward/operator.go index 79f82acbd..fc51aa1c5 100644 --- a/services/reward/operator.go +++ b/services/reward/operator.go @@ -78,11 +78,11 @@ func Operate(ctx *models.RewardOperateContext) error { //operate if err := operator.Operate(ctx); err != nil { - updateAwardOperateRecordStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) + UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusFailed) return err } - updateAwardOperateRecordStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) + UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusSucceeded) NotifyRewardOperation(ctx.TargetUserId, ctx.Reward.Amount, ctx.Reward.Type, ctx.OperateType) return nil } @@ -160,8 +160,8 @@ func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (strin return record.SerialNo, nil } -func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { - _, err := models.UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus) +func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) error { + _, err := models.UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus) if err != nil { return err } @@ -169,10 +169,10 @@ func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus } func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) { - go StartPeriodicTask(opts) + go StartAndGetPeriodicTask(opts) } -func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) error { +func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) { defer func() { if err := recover(); err != nil { combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) @@ -183,35 +183,46 @@ func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) error { var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name())) isOk, err := rewardLock.Lock(3 * time.Second) if err != nil { - return err + return nil, err } if !isOk { log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId) - return nil + return nil, nil } defer rewardLock.UnLock() - //is handled before? - isHandled, err := isHandled(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name()) - if err != nil { - log.Error("operate is handled error,%v", err) - return err + _, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name()) + if err == nil { + task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType) + if err != nil { + log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) + return nil, err + } + return task, nil } - if isHandled { - log.Info("operate has been handled,opts=%+v", opts) - return nil + + if err != nil && !models.IsErrRecordNotExist(err) { + log.Error("operate is handled error,%v", err) + return nil, err } + //new reward operate record recordId, err := createPeriodicRewardOperateRecord(opts) if err != nil { - return err + return nil, err } if err = NewRewardPeriodicTask(recordId, opts); err != nil { - updateAwardOperateRecordStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) - return err + UpdateRewardRecordToFinalStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusFailed) + return nil, err } - return nil + + task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType) + if err != nil { + log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err) + return nil, err + } + return task, nil } func StopPeriodicTaskAsyn(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) { diff --git a/services/reward/period_task.go b/services/reward/period_task.go index 3fa416dab..c2808c4c0 100644 --- a/services/reward/period_task.go +++ b/services/reward/period_task.go @@ -6,6 +6,8 @@ import ( "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" "code.gitea.io/gitea/modules/timeutil" + "code.gitea.io/gitea/routers/repo" + "errors" "fmt" "time" ) @@ -46,33 +48,33 @@ func StartRewardTask() { } } -func RunRewardTask(t models.RewardPeriodicTask, now time.Time) { +func RunRewardTask(t models.RewardPeriodicTask, now time.Time) error { lock := redis_lock.NewDistributeLock(redis_key.RewardTaskRunningLock(t.ID)) isOk, _ := lock.LockWithWait(3*time.Second, 3*time.Second) if !isOk { log.Error("get RewardTaskRunningLock failed,t=%+v", t) - return + return errors.New("get RewardTaskRunningLock failed") } defer lock.UnLock() record, err := models.GetPointOperateRecordBySerialNo(t.OperateSerialNo) if err != nil { log.Error("RunRewardTask. GetPointOperateRecordBySerialNo error. %v", err) - return + return errors.New("GetPointOperateRecordBySerialNo error") } if record.Status != models.OperateStatusOperating { log.Info("RunRewardTask. operate record is finished,record=%+v", record) - return + return nil } n, _ := countExecuteTimes(t, now) if n == 0 { - return + return nil } //get operator operator := GetOperator(models.GetRewardTypeInstance(record.RewardType)) if operator == nil { log.Error("RunRewardTask. operator of reward type is not exist") - return + return errors.New("operator of reward type is not exist") } nextTime := t.NextExecuteTime for i := 0; int64(i) <= n; i++ { @@ -89,14 +91,20 @@ func RunRewardTask(t models.RewardPeriodicTask, now time.Time) { if err != nil { log.Error("RunRewardTask.operator operate error.%v", err) if models.IsErrInsufficientPointsBalance(err) { - StopCloudbrainTask(record) - return + task, err := models.GetCloudbrainByID(record.SourceId) + if err != nil { + log.Error("RunRewardTask GetCloudbrainByID error. %v", err) + return err + } + repo.StopJobs([]*models.Cloudbrain{task}) + return nil } - return + return nil } - models.IncrRewardTaskSuccessCount(t, n, nextTime) + models.IncrRewardTaskSuccessCount(t, 1, nextTime) nextTime = timeutil.TimeStamp(int64(nextTime) + t.IntervalSeconds) } + return nil } @@ -111,8 +119,3 @@ func countExecuteTimes(t models.RewardPeriodicTask, now time.Time) (int64, timeu newNextTime := timeutil.TimeStamp(nextTime + n*interval) return n, newNextTime } - -func StopCloudbrainTask(r *models.RewardOperateRecord) { - //todo - -} diff --git a/services/reward/point/account/point_account.go b/services/reward/point/account/point_account.go index ea127e162..693694c76 100644 --- a/services/reward/point/account/point_account.go +++ b/services/reward/point/account/point_account.go @@ -5,6 +5,7 @@ import ( "code.gitea.io/gitea/modules/redis/redis_client" "code.gitea.io/gitea/modules/redis/redis_key" "code.gitea.io/gitea/modules/redis/redis_lock" + "code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/util" "encoding/json" "time" @@ -60,3 +61,20 @@ func InitAccount(userId int64) (*models.PointAccount, error) { return nil, nil } + +//IsPointBalanceEnough check whether the user's point balance is bigger than task unit price +func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int) bool { + if !setting.CloudBrainPaySwitch { + return true + } + spec := models.GetResourceSpec(jobType, resourceSpecId) + if spec == nil { + return true + } + a, error := GetAccount(targetUserId) + if error != nil { + return false + } + return a.Balance >= spec.UnitPrice + +} diff --git a/services/reward/record.go b/services/reward/record.go index 157e53b53..b1ac86876 100644 --- a/services/reward/record.go +++ b/services/reward/record.go @@ -1,9 +1,11 @@ package reward -import "code.gitea.io/gitea/models" +import ( + "code.gitea.io/gitea/models" +) type RecordResponse struct { - Records []models.RewardOperateRecordShow + Records []*models.RewardOperateRecordShow Total int64 PageSize int Page int @@ -14,9 +16,13 @@ func GetRewardRecordList(opts models.RewardRecordListOpts) (*RecordResponse, err if err != nil { return nil, err } - r := make([]models.RewardOperateRecordShow, 0) - for _, v := range l { - r = append(r, v.ToShow()) + if len(l) == 0 { + return &RecordResponse{Records: make([]*models.RewardOperateRecordShow, 0), Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil } - return &RecordResponse{Records: r, Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil + result, err := l.ToShow() + if err != nil { + return nil, err + } + + return &RecordResponse{Records: result, Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil } diff --git a/services/task/task.go b/services/task/task.go index 0dfc38b6c..e5b57ac3d 100644 --- a/services/task/task.go +++ b/services/task/task.go @@ -3,7 +3,6 @@ package task import ( "code.gitea.io/gitea/models" "code.gitea.io/gitea/modules/log" - "code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/services/reward" "code.gitea.io/gitea/services/reward/limiter" "fmt" @@ -55,9 +54,7 @@ func accomplish(action models.Action) error { } //add log - logId := util.UUID() _, err = models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{ - LogId: logId, ConfigId: config.ID, TaskCode: config.TaskCode, UserId: userId, @@ -70,14 +67,14 @@ func accomplish(action models.Action) error { //reward reward.Operate(&models.RewardOperateContext{ SourceType: models.SourceTypeAccomplishTask, - SourceId: logId, + SourceId: fmt.Sprint(action.ID), Tittle: config.Tittle, Reward: models.Reward{ Amount: config.AwardAmount, Type: models.GetRewardTypeInstance(config.AwardType), }, TargetUserId: userId, - RequestId: logId, + RequestId: fmt.Sprint(action.ID), OperateType: models.OperateTypeIncrease, RejectPolicy: models.FillUp, })