Browse Source

#2225

update
tags/v1.22.9.2^2
chenyifan01 3 years ago
parent
commit
44586c567b
16 changed files with 386 additions and 136 deletions
  1. +15
    -0
      models/action.go
  2. +109
    -1
      models/cloudbrain.go
  3. +15
    -0
      models/reward_admin_log.go
  4. +126
    -21
      models/reward_operate_record.go
  5. +1
    -1
      models/reward_periodic_task.go
  6. +0
    -1
      models/task_accomplish_log.go
  7. +1
    -1
      modules/cron/tasks_basic.go
  8. +5
    -5
      routers/repo/cloudbrain.go
  9. +5
    -5
      routers/repo/modelarts.go
  10. +0
    -1
      routers/reward/point/point.go
  11. +28
    -54
      services/reward/cloubrain_deduct.go
  12. +31
    -20
      services/reward/operator.go
  13. +18
    -15
      services/reward/period_task.go
  14. +18
    -0
      services/reward/point/account/point_account.go
  15. +12
    -6
      services/reward/record.go
  16. +2
    -5
      services/task/task.go

+ 15
- 0
models/action.go View File

@@ -412,3 +412,18 @@ func GetUnTransformedActions() ([]*Action, error) {
Find(&actions)
return actions, err
}

func GetActionByIds(ids []int64) ([]*Action, error) {
if len(ids) == 0 {
return nil, nil
}
actions := make([]*Action, 0)
err := x.In("id", ids).Find(&actions)
if err != nil {
return nil, err
}
if err := ActionList(actions).LoadAttributes(); err != nil {
return nil, fmt.Errorf("ActionList loadAttributes: %v", err)
}
return actions, nil
}

+ 109
- 1
models/cloudbrain.go View File

@@ -168,6 +168,72 @@ type Cloudbrain struct {
EndTime timeutil.TimeStamp
}

type CloudbrainShow struct {
JobID string `xorm:"INDEX NOT NULL"`
JobType string `xorm:"INDEX NOT NULL DEFAULT 'DEBUG'"`
JobName string
DisplayJobName string
Status string
UserID int64 `xorm:"INDEX NOT NULL"`
RepoID int64 `xorm:"INDEX NOT NULL"`
SubTaskName string
ContainerID string
ContainerIp string
CreatedUnix timeutil.TimeStamp `xorm:"INDEX"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
Duration int64 `xorm:"DEFAULT 0"` //运行时长 单位秒
TrainJobDuration string `xorm:"DEFAULT '00:00:00'"`
Image string //镜像名称
GpuQueue string //GPU类型即GPU队列
ResourceSpecId int //GPU规格id
DeletedAt time.Time `xorm:"deleted"`
CanDebug bool `xorm:"-"`
CanDel bool `xorm:"-"`
CanModify bool `xorm:"-"`
Type int
BenchmarkTypeID int
BenchmarkChildTypeID int

VersionID int64 //版本id
VersionName string `xorm:"INDEX"` //当前版本
Uuid string //数据集id
DatasetName string
VersionCount int //任务的当前版本数量,不包括删除的
IsLatestVersion string //是否是最新版本,1是,0否
CommitID string //提交的仓库代码id
PreVersionName string //父版本名称
ComputeResource string //计算资源,例如npu
EngineID int64 //引擎id

TrainUrl string //输出模型的obs路径
BranchName string //分支名称
Parameters string //传给modelarts的param参数
BootFile string //启动文件
DataUrl string //数据集的obs路径
LogUrl string //日志输出的obs路径
PreVersionId int64 //父版本的版本id
FlavorCode string //modelarts上的规格id
Description string `xorm:"varchar(256)"` //描述
WorkServerNumber int //节点数
FlavorName string //规格名称
EngineName string //引擎名称
TotalVersionCount int //任务的所有版本数量,包括删除的

LabelName string //标签名称
ModelName string //模型名称
ModelVersion string //模型版本
CkptName string //权重文件名称
ResultUrl string //推理结果的obs路径

User *User `xorm:"-"`
Repo *Repository `xorm:"-"`
BenchmarkType string `xorm:"-"` //算法评测,模型评测
BenchmarkTypeName string `xorm:"-"`
BenchmarkTypeRankLink string `xorm:"-"`
StartTime timeutil.TimeStamp
EndTime timeutil.TimeStamp
}

func (task *Cloudbrain) ComputeAndSetDuration() {
var d int64
if task.StartTime == 0 {
@@ -1844,9 +1910,51 @@ func CloudbrainAllStatic(opts *CloudbrainsOptions) ([]*CloudbrainInfo, int64, er

func GetStartedCloudbrainTaskByUpdatedUnix(startTime, endTime time.Time) ([]Cloudbrain, error) {
r := make([]Cloudbrain, 0)
err := x.Where("updated_unix >= ? and updated_unix <= ? and start_time > 0", startTime.Unix(), endTime.Unix()).Find(&r)
err := x.Where("updated_unix >= ? and updated_unix <= ? and start_time > 0", startTime.Unix(), endTime.Unix()).Unscoped().Find(&r)
if err != nil {
return nil, err
}
return r, nil
}

func GetCloudbrainByIds(ids []int64) ([]Cloudbrain, error) {
if len(ids) == 0 {
return nil, nil
}
cloudbrains := make([]Cloudbrain, 0)
err := x.In("id", ids).Unscoped().Find(&cloudbrains)
if err != nil {
return nil, err
}
return cloudbrains, nil
}

var (
DebugResourceSpecs *ResourceSpecs
TrainResourceSpecs *ResourceSpecs
)

func GetResourceSpec(jobType string, resourceSpecId int) *ResourceSpec {
if jobType == string(JobTypeTrain) {
if TrainResourceSpecs == nil {
json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs)
}
for _, spec := range TrainResourceSpecs.ResourceSpec {
if resourceSpecId == spec.Id {
return spec
}
}
} else {
if DebugResourceSpecs == nil {
json.Unmarshal([]byte(setting.ResourceSpecs), &DebugResourceSpecs)
}
for _, spec := range DebugResourceSpecs.ResourceSpec {
if resourceSpecId == spec.Id {
return spec
}
}

}
return nil

}

+ 15
- 0
models/reward_admin_log.go View File

@@ -2,6 +2,7 @@ package models

import (
"code.gitea.io/gitea/modules/timeutil"
"strings"
)

const (
@@ -44,3 +45,17 @@ func UpdateRewardAdminLogStatus(logId string, oldStatus, newStatus int) error {
}
return nil
}

func GetRewardAdminLogByLogIds(logIds []string) ([]RewardAdminLog, error) {
if len(logIds) == 0 {
return nil, nil
}
adminLogs := make([]RewardAdminLog, 0)
err := x.SQL("select rdl.id,rdl.log_id,rdl.amount,rdl.reward_type,rdl.remark,rdl.status,rdl.target_user_id,rdl.creator_id,u.name as creator_name "+
"from reward_admin_log rdl left join public.user u on rdl.creator_id = u.id "+
"where rdl.log_id in (?)", strings.Join(logIds, ",")).Find(&adminLogs)
if err != nil {
return nil, err
}
return adminLogs, nil
}

+ 126
- 21
models/reward_operate_record.go View File

@@ -2,6 +2,8 @@ package models

import (
"code.gitea.io/gitea/modules/timeutil"
"fmt"
"strconv"
"strings"
"xorm.io/builder"
)
@@ -112,21 +114,120 @@ const (
RewardOrderByIDDesc RewardOperateOrderBy = "id desc"
)

type RewardRecordList []*RewardOperateRecord
type RewardRecordShowList []*RewardOperateRecordShow

func (l *RewardRecordList) ToShow() (RewardRecordShowList, error) {
actionMap, err := l.GetRewardRecordAction()
adminLogMap, err := l.GetRewardRecordAdminLog()
CloudbrainMap, err := l.GetRewardRecordCloudbrainTask()
if err != nil {
return nil, err
}
result := make([]*RewardOperateRecordShow, 0)
for _, v := range *l {
temp := v.ToShow()
switch v.SourceType {
case SourceTypeAccomplishTask.Name():
temp.Action = actionMap[v.SourceId]
case SourceTypeAdminOperate.Name():
temp.AdminLog = adminLogMap[v.SourceId]
case SourceTypeRunCloudbrainTask.Name():
temp.Cloudbrain = CloudbrainMap[v.SourceId]
}
result = append(result, &temp)
}

return result, nil
}

func (l *RewardRecordList) GetRewardRecordAction() (map[string]Action, error) {
if len(*l) == 0 {
return nil, nil
}
actionIds := make([]int64, 0)
for _, r := range *l {
if r.SourceType != SourceTypeAccomplishTask.Name() {
continue
}
i, _ := strconv.ParseInt(r.SourceId, 10, 64)
actionIds = append(actionIds, i)
}
actions, err := GetActionByIds(actionIds)
if err != nil {
return nil, err
}
result := make(map[string]Action, 0)
for _, v := range actions {
result[fmt.Sprint(v.ID)] = *v
}
return result, nil

}

func (l *RewardRecordList) GetRewardRecordAdminLog() (map[string]RewardAdminLog, error) {
if len(*l) == 0 {
return nil, nil
}
logIds := make([]string, 0)
for _, r := range *l {
if r.SourceType != SourceTypeAdminOperate.Name() {
continue
}
logIds = append(logIds, r.SourceId)
}
logs, err := GetRewardAdminLogByLogIds(logIds)
if err != nil {
return nil, err
}
result := make(map[string]RewardAdminLog, 0)
for _, v := range logs {
result[fmt.Sprint(v.LogId)] = v
}
return result, nil

}

func (l *RewardRecordList) GetRewardRecordCloudbrainTask() (map[string]Cloudbrain, error) {
if len(*l) == 0 {
return nil, nil
}
cloudbrainIds := make([]int64, 0)
for _, r := range *l {
if r.SourceType != SourceTypeRunCloudbrainTask.Name() {
continue
}
i, _ := strconv.ParseInt(r.SourceId, 10, 64)
cloudbrainIds = append(cloudbrainIds, i)
}
cloudbrains, err := GetCloudbrainByIds(cloudbrainIds)
if err != nil {
return nil, err
}
result := make(map[string]Cloudbrain, 0)
for _, v := range cloudbrains {
result[fmt.Sprint(v.ID)] = v
}
return result, nil

}

type RewardOperateRecord struct {
ID int64 `xorm:"pk autoincr"`
SerialNo string `xorm:"INDEX NOT NULL"`
UserId int64 `xorm:"INDEX NOT NULL"`
Amount int64 `xorm:"NOT NULL"`
Tittle string
RewardType string `xorm:"NOT NULL"`
SourceType string `xorm:"NOT NULL"`
SourceId string `xorm:"INDEX NOT NULL"`
RequestId string `xorm:"INDEX NOT NULL"`
OperateType string `xorm:"NOT NULL"`
Status string `xorm:"NOT NULL"`
Remark string
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
ID int64 `xorm:"pk autoincr"`
SerialNo string `xorm:"INDEX NOT NULL"`
UserId int64 `xorm:"INDEX NOT NULL"`
Amount int64 `xorm:"NOT NULL"`
Tittle string
RewardType string `xorm:"NOT NULL"`
SourceType string `xorm:"NOT NULL"`
SourceId string `xorm:"INDEX NOT NULL"`
RequestId string `xorm:"INDEX NOT NULL"`
OperateType string `xorm:"NOT NULL"`
Status string `xorm:"NOT NULL"`
Remark string
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
UpdatedUnix timeutil.TimeStamp `xorm:"INDEX updated"`
FinishedUnix timeutil.TimeStamp `xorm:"INDEX"`
}

type AdminRewardOperateReq struct {
@@ -144,6 +245,8 @@ func (r RewardOperateRecord) ToShow() RewardOperateRecordShow {
OperateType: r.OperateType,
Amount: r.Amount,
Remark: r.Remark,
Status: r.Status,
SourceType: r.SourceType,
}
}

@@ -153,10 +256,11 @@ type RewardOperateRecordShow struct {
Status string
OperateType string
Amount int64
Remark string
SourceType string
Action Action
Cloudbrain Cloudbrain
SourceType SourceType
Remark string
AdminLog RewardAdminLog
}

func getPointOperateRecord(tl *RewardOperateRecord) (*RewardOperateRecord, error) {
@@ -189,11 +293,12 @@ func InsertRewardOperateRecord(tl *RewardOperateRecord) (int64, error) {
return x.Insert(tl)
}

func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) (int64, error) {
func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) (int64, error) {
r := &RewardOperateRecord{
Status: newStatus,
Status: newStatus,
FinishedUnix: timeutil.TimeStampNow(),
}
return x.Cols("status").Where("source_type=? and request_id=? and status=?", sourceType, requestId, oldStatus).Update(r)
return x.Cols("status", "finished_unix").Where("source_type=? and request_id=? and status=?", sourceType, requestId, OperateStatusOperating).Update(r)
}

func SumRewardAmountInTaskPeriod(rewardType string, sourceType string, userId int64, period *PeriodResult) (int64, error) {
@@ -252,7 +357,7 @@ type RewardRecordListOpts struct {
OrderBy RewardOperateOrderBy
}

func GetRewardRecordList(opts RewardRecordListOpts) ([]RewardOperateRecord, int64, error) {
func GetRewardRecordList(opts RewardRecordListOpts) (RewardRecordList, int64, error) {
if opts.Page <= 0 {
opts.Page = 1
}
@@ -261,7 +366,7 @@ func GetRewardRecordList(opts RewardRecordListOpts) ([]RewardOperateRecord, int6
opts.OrderBy = RewardOrderByIDDesc
}

r := make([]RewardOperateRecord, 0)
r := make([]*RewardOperateRecord, 0)
cond := builder.NewCond()
if opts.UserId > 0 {
cond = cond.And(builder.Eq{"user_id": opts.UserId})


+ 1
- 1
models/reward_periodic_task.go View File

@@ -77,7 +77,7 @@ func IncrRewardTaskSuccessCount(t RewardPeriodicTask, count int64, nextTime time
sess.Rollback()
return err
}
_, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where serial_no = ?", count*t.Amount, timeutil.TimeStampNow(), t.OperateSerialNo)
_, err = sess.Exec("update reward_operate_record set amount = amount + ? ,updated_unix = ? where serial_no = ?", t.Amount, timeutil.TimeStampNow(), t.OperateSerialNo)
if err != nil {
sess.Rollback()
return err


+ 0
- 1
models/task_accomplish_log.go View File

@@ -7,7 +7,6 @@ import (

type TaskAccomplishLog struct {
ID int64 `xorm:"pk autoincr"`
LogId string `xorm:"INDEX NOT NULL"`
ConfigId int64 `xorm:"NOT NULL"`
TaskCode string `xorm:"NOT NULL"`
UserId int64 `xorm:"INDEX NOT NULL"`


+ 1
- 1
modules/cron/tasks_basic.go View File

@@ -251,6 +251,6 @@ func initBasicTasks() {
registerSyncCloudbrainStatus()
registerHandleOrgStatistic()

registerRewardPeriodTask()
//registerRewardPeriodTask()
registerCloudbrainPointDeductTask()
}

+ 5
- 5
routers/repo/cloudbrain.go View File

@@ -2,7 +2,7 @@ package repo

import (
"bufio"
"code.gitea.io/gitea/services/reward"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"errors"
"fmt"
@@ -230,7 +230,7 @@ func CloudBrainCreate(ctx *context.Context, form auth.CreateCloudBrainForm) {
command = commandTrain
}

if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId)
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form)
@@ -318,7 +318,7 @@ func CloudBrainRestart(ctx *context.Context) {
var status = string(models.JobWaiting)
task := ctx.Cloudbrain
for {
if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId)
resultCode = "-1"
errorMsg = models.ErrInsufficientPointsBalance{}.Error()
@@ -1870,7 +1870,7 @@ func BenchMarkAlgorithmCreate(ctx *context.Context, form auth.CreateCloudBrainFo

repo := ctx.Repo.Repository

if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId)
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplCloudBrainBenchmarkNew, &form)
@@ -2032,7 +2032,7 @@ func ModelBenchmarkCreate(ctx *context.Context, form auth.CreateCloudBrainForm)
tpl := tplCloudBrainBenchmarkNew
command := cloudbrain.Command

if !reward.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, jobType, resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, jobType, resourceSpecId)
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tpl, &form)


+ 5
- 5
routers/repo/modelarts.go View File

@@ -2,7 +2,7 @@ package repo

import (
"archive/zip"
"code.gitea.io/gitea/services/reward"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"errors"
"fmt"
@@ -207,7 +207,7 @@ func Notebook2Create(ctx *context.Context, form auth.CreateModelArtsNotebookForm
repo := ctx.Repo.Repository
resourceSpecId := form.ResourceSpecId

if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeDebug), resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeDebug), resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId)
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsNotebookNew, &form)
@@ -426,7 +426,7 @@ func NotebookManage(ctx *context.Context) {
errorMsg = "you have no right to restart the job"
break
}
if !reward.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, task.JobType, task.ResourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, task.JobType, task.ResourceSpecId)
resultCode = "-1"
errorMsg = models.ErrInsufficientPointsBalance{}.Error()
@@ -1002,7 +1002,7 @@ func TrainJobCreate(ctx *context.Context, form auth.CreateModelArtsTrainJobForm)
EngineName := form.EngineName
resourceSpecId := form.ResourceSpecId

if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeTrain), resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeTrain), resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId)
cloudBrainNewDataPrepare(ctx)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsTrainJobNew, &form)
@@ -1851,7 +1851,7 @@ func InferenceJobCreate(ctx *context.Context, form auth.CreateModelArtsInference

ckptUrl := form.TrainUrl + form.CkptName

if !reward.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeInference), resourceSpecId) {
if !account.IsPointBalanceEnough(ctx.User.ID, string(models.JobTypeInference), resourceSpecId) {
log.Error("point balance is not enough,userId=%d jobType=%s resourceSpecId=%d", ctx.User.ID, string(models.JobTypeBenchmark), resourceSpecId)
inferenceJobErrorNewDataPrepare(ctx, form)
ctx.RenderWithErr(models.ErrInsufficientPointsBalance{}.Error(), tplModelArtsInferenceJobNew, &form)


+ 0
- 1
routers/reward/point/point.go View File

@@ -13,7 +13,6 @@ import (
const tplPoint base.TplName = "/reward/point"

type AccountResponse struct {
AccountCode string
Balance int64
TotalEarned int64
TotalConsumed int64


+ 28
- 54
services/reward/cloubrain_deduct.go View File

@@ -4,8 +4,6 @@ import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/services/reward/point/account"
"encoding/json"
"fmt"
"time"
)
@@ -17,34 +15,17 @@ var (

const RUN_CLOUDBRAIN_TASK_TITTLE = "运行云脑任务"

//IsPointBalanceEnough check whether the user's point balance is bigger than task unit price
func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int) bool {
func StartAndGetCloudBrainPointDeductTask(task models.Cloudbrain) (*models.RewardPeriodicTask, error) {
if !setting.CloudBrainPaySwitch {
return true
}
spec := getResourceSpec(jobType, resourceSpecId)
if spec == nil {
return true
}
a, error := account.GetAccount(targetUserId)
if error != nil {
return false
}
return a.Balance >= spec.UnitPrice

}

func StartCloudBrainPointDeductTask(task models.Cloudbrain) {
if !setting.CloudBrainPaySwitch {
return
return nil, nil
}

spec := getResourceSpec(task.JobType, task.ResourceSpecId)
spec := models.GetResourceSpec(task.JobType, task.ResourceSpecId)
if spec == nil || spec.UnitPrice == 0 {
return
return nil, nil
}

StartPeriodicTask(&models.StartPeriodicTaskOpts{
return StartAndGetPeriodicTask(&models.StartPeriodicTaskOpts{
SourceType: models.SourceTypeRunCloudbrainTask,
SourceId: getCloudBrainPointTaskSourceId(task),
TargetUserId: task.UserID,
@@ -67,31 +48,6 @@ func getCloudBrainPointTaskSourceId(task models.Cloudbrain) string {
return fmt.Sprint(task.ID)
}

func getResourceSpec(jobType string, resourceSpecId int) *models.ResourceSpec {
if jobType == string(models.JobTypeTrain) {
if TrainResourceSpecs == nil {
json.Unmarshal([]byte(setting.TrainResourceSpecs), &TrainResourceSpecs)
}
for _, spec := range TrainResourceSpecs.ResourceSpec {
if resourceSpecId == spec.Id {
return spec
}
}
} else {
if ResourceSpecs == nil {
json.Unmarshal([]byte(setting.ResourceSpecs), &ResourceSpecs)
}
for _, spec := range ResourceSpecs.ResourceSpec {
if resourceSpecId == spec.Id {
return spec
}
}

}
return nil

}

var firstTimeFlag = true

func StartCloudbrainPointDeductTask() {
@@ -107,10 +63,9 @@ func StartCloudbrainPointDeductTask() {
if firstTimeFlag {
//When it is executed for the first time, it needs to process the tasks of the last 1 hours.
//This is done to prevent the application from hanging for a long time
start = end.Add(-1 * time.Hour)
start = end.Add(-3 * time.Hour)
firstTimeFlag = false
}

taskList, err := models.GetStartedCloudbrainTaskByUpdatedUnix(start, end)
if err != nil {
log.Error("GetStartedCloudbrainTaskByUpdatedUnix error. %v", err)
@@ -121,11 +76,30 @@ func StartCloudbrainPointDeductTask() {
return
}
for _, t := range taskList {
if int64(t.StartTime) <= end.Unix() && int64(t.StartTime) >= start.Unix() {
StartCloudBrainPointDeductTask(t)
//初始化 period_task 和 operate_record
if int64(t.StartTime) > end.Unix() || int64(t.StartTime) < start.Unix() {
continue
}

task, err := StartAndGetCloudBrainPointDeductTask(t)
if err != nil {
log.Error("run cloubrain point deduct task error,err=%v", err)
continue
}
if task == nil {
continue
}
if task.Status == models.PeriodicTaskStatusFinished {
log.Info("Periodic task is finished")
continue
}

if int64(t.EndTime) <= end.Unix() && int64(t.EndTime) >= start.Unix() {
StopCloudBrainPointDeductTask(t)
endTime := time.Unix(int64(t.EndTime), 0)
RunRewardTask(*task, endTime)
models.StopPeriodicTask(task.ID, task.OperateSerialNo, endTime)
} else {
RunRewardTask(*task, end)
}
}
}

+ 31
- 20
services/reward/operator.go View File

@@ -78,11 +78,11 @@ func Operate(ctx *models.RewardOperateContext) error {

//operate
if err := operator.Operate(ctx); err != nil {
updateAwardOperateRecordStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed)
UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusFailed)
return err
}

updateAwardOperateRecordStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded)
UpdateRewardRecordToFinalStatus(ctx.SourceType.Name(), ctx.RequestId, models.OperateStatusSucceeded)
NotifyRewardOperation(ctx.TargetUserId, ctx.Reward.Amount, ctx.Reward.Type, ctx.OperateType)
return nil
}
@@ -160,8 +160,8 @@ func createPeriodicRewardOperateRecord(ctx *models.StartPeriodicTaskOpts) (strin
return record.SerialNo, nil
}

func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error {
_, err := models.UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus)
func UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus string) error {
_, err := models.UpdateRewardRecordToFinalStatus(sourceType, requestId, newStatus)
if err != nil {
return err
}
@@ -169,10 +169,10 @@ func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus
}

func StartPeriodicTaskAsyn(opts *models.StartPeriodicTaskOpts) {
go StartPeriodicTask(opts)
go StartAndGetPeriodicTask(opts)
}

func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) error {
func StartAndGetPeriodicTask(opts *models.StartPeriodicTaskOpts) (*models.RewardPeriodicTask, error) {
defer func() {
if err := recover(); err != nil {
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
@@ -183,35 +183,46 @@ func StartPeriodicTask(opts *models.StartPeriodicTaskOpts) error {
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardOperateLock(opts.RequestId, opts.SourceType.Name(), opts.OperateType.Name()))
isOk, err := rewardLock.Lock(3 * time.Second)
if err != nil {
return err
return nil, err
}
if !isOk {
log.Info("duplicated operate request,targetUserId=%d requestId=%s", opts.TargetUserId, opts.RequestId)
return nil
return nil, nil
}
defer rewardLock.UnLock()

//is handled before?
isHandled, err := isHandled(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name())
if err != nil {
log.Error("operate is handled error,%v", err)
return err
_, err = models.GetPointOperateRecordBySourceTypeAndRequestId(opts.SourceType.Name(), opts.RequestId, opts.OperateType.Name())
if err == nil {
task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
if err != nil {
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
return nil, err
}
return task, nil
}
if isHandled {
log.Info("operate has been handled,opts=%+v", opts)
return nil

if err != nil && !models.IsErrRecordNotExist(err) {
log.Error("operate is handled error,%v", err)
return nil, err
}

//new reward operate record
recordId, err := createPeriodicRewardOperateRecord(opts)
if err != nil {
return err
return nil, err
}

if err = NewRewardPeriodicTask(recordId, opts); err != nil {
updateAwardOperateRecordStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusOperating, models.OperateStatusFailed)
return err
UpdateRewardRecordToFinalStatus(opts.SourceType.Name(), opts.RequestId, models.OperateStatusFailed)
return nil, err
}
return nil

task, err := models.GetPeriodicTaskBySourceIdAndType(opts.SourceType, opts.SourceId, opts.OperateType)
if err != nil {
log.Error("GetPeriodicTaskBySourceIdAndType error,%v", err)
return nil, err
}
return task, nil
}

func StopPeriodicTaskAsyn(sourceType models.SourceType, sourceId string, operateType models.RewardOperateType) {


+ 18
- 15
services/reward/period_task.go View File

@@ -6,6 +6,8 @@ import (
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/timeutil"
"code.gitea.io/gitea/routers/repo"
"errors"
"fmt"
"time"
)
@@ -46,33 +48,33 @@ func StartRewardTask() {
}
}

func RunRewardTask(t models.RewardPeriodicTask, now time.Time) {
func RunRewardTask(t models.RewardPeriodicTask, now time.Time) error {
lock := redis_lock.NewDistributeLock(redis_key.RewardTaskRunningLock(t.ID))
isOk, _ := lock.LockWithWait(3*time.Second, 3*time.Second)
if !isOk {
log.Error("get RewardTaskRunningLock failed,t=%+v", t)
return
return errors.New("get RewardTaskRunningLock failed")
}
defer lock.UnLock()
record, err := models.GetPointOperateRecordBySerialNo(t.OperateSerialNo)
if err != nil {
log.Error("RunRewardTask. GetPointOperateRecordBySerialNo error. %v", err)
return
return errors.New("GetPointOperateRecordBySerialNo error")
}
if record.Status != models.OperateStatusOperating {
log.Info("RunRewardTask. operate record is finished,record=%+v", record)
return
return nil
}
n, _ := countExecuteTimes(t, now)
if n == 0 {
return
return nil
}

//get operator
operator := GetOperator(models.GetRewardTypeInstance(record.RewardType))
if operator == nil {
log.Error("RunRewardTask. operator of reward type is not exist")
return
return errors.New("operator of reward type is not exist")
}
nextTime := t.NextExecuteTime
for i := 0; int64(i) <= n; i++ {
@@ -89,14 +91,20 @@ func RunRewardTask(t models.RewardPeriodicTask, now time.Time) {
if err != nil {
log.Error("RunRewardTask.operator operate error.%v", err)
if models.IsErrInsufficientPointsBalance(err) {
StopCloudbrainTask(record)
return
task, err := models.GetCloudbrainByID(record.SourceId)
if err != nil {
log.Error("RunRewardTask GetCloudbrainByID error. %v", err)
return err
}
repo.StopJobs([]*models.Cloudbrain{task})
return nil
}
return
return nil
}
models.IncrRewardTaskSuccessCount(t, n, nextTime)
models.IncrRewardTaskSuccessCount(t, 1, nextTime)
nextTime = timeutil.TimeStamp(int64(nextTime) + t.IntervalSeconds)
}
return nil

}

@@ -111,8 +119,3 @@ func countExecuteTimes(t models.RewardPeriodicTask, now time.Time) (int64, timeu
newNextTime := timeutil.TimeStamp(nextTime + n*interval)
return n, newNextTime
}

func StopCloudbrainTask(r *models.RewardOperateRecord) {
//todo

}

+ 18
- 0
services/reward/point/account/point_account.go View File

@@ -5,6 +5,7 @@ import (
"code.gitea.io/gitea/modules/redis/redis_client"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
"encoding/json"
"time"
@@ -60,3 +61,20 @@ func InitAccount(userId int64) (*models.PointAccount, error) {
return nil, nil

}

//IsPointBalanceEnough check whether the user's point balance is bigger than task unit price
func IsPointBalanceEnough(targetUserId int64, jobType string, resourceSpecId int) bool {
if !setting.CloudBrainPaySwitch {
return true
}
spec := models.GetResourceSpec(jobType, resourceSpecId)
if spec == nil {
return true
}
a, error := GetAccount(targetUserId)
if error != nil {
return false
}
return a.Balance >= spec.UnitPrice

}

+ 12
- 6
services/reward/record.go View File

@@ -1,9 +1,11 @@
package reward

import "code.gitea.io/gitea/models"
import (
"code.gitea.io/gitea/models"
)

type RecordResponse struct {
Records []models.RewardOperateRecordShow
Records []*models.RewardOperateRecordShow
Total int64
PageSize int
Page int
@@ -14,9 +16,13 @@ func GetRewardRecordList(opts models.RewardRecordListOpts) (*RecordResponse, err
if err != nil {
return nil, err
}
r := make([]models.RewardOperateRecordShow, 0)
for _, v := range l {
r = append(r, v.ToShow())
if len(l) == 0 {
return &RecordResponse{Records: make([]*models.RewardOperateRecordShow, 0), Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil
}
return &RecordResponse{Records: r, Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil
result, err := l.ToShow()
if err != nil {
return nil, err
}

return &RecordResponse{Records: result, Total: n, Page: opts.Page, PageSize: opts.PageSize}, nil
}

+ 2
- 5
services/task/task.go View File

@@ -3,7 +3,6 @@ package task
import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/services/reward"
"code.gitea.io/gitea/services/reward/limiter"
"fmt"
@@ -55,9 +54,7 @@ func accomplish(action models.Action) error {
}

//add log
logId := util.UUID()
_, err = models.InsertTaskAccomplishLog(&models.TaskAccomplishLog{
LogId: logId,
ConfigId: config.ID,
TaskCode: config.TaskCode,
UserId: userId,
@@ -70,14 +67,14 @@ func accomplish(action models.Action) error {
//reward
reward.Operate(&models.RewardOperateContext{
SourceType: models.SourceTypeAccomplishTask,
SourceId: logId,
SourceId: fmt.Sprint(action.ID),
Tittle: config.Tittle,
Reward: models.Reward{
Amount: config.AwardAmount,
Type: models.GetRewardTypeInstance(config.AwardType),
},
TargetUserId: userId,
RequestId: logId,
RequestId: fmt.Sprint(action.ID),
OperateType: models.OperateTypeIncrease,
RejectPolicy: models.FillUp,
})


Loading…
Cancel
Save