@@ -2,6 +2,24 @@ package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
type LimitType string | |||
const ( | |||
LimitTypeTask LimitType = "TASK" | |||
LimitTypeReward LimitType = "REWARD" | |||
) | |||
func (l LimitType) Name() string { | |||
switch l { | |||
case LimitTypeTask: | |||
return "TASK" | |||
case LimitTypeReward: | |||
return "REWARD" | |||
default: | |||
return "" | |||
} | |||
} | |||
type LimitConfig struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
Tittle string | |||
@@ -9,14 +27,15 @@ type LimitConfig struct { | |||
Scope string `xorm:"NOT NULL"` | |||
LimitNum int64 `xorm:"NOT NULL"` | |||
LimitCode string `xorm:"NOT NULL"` | |||
LimitType string `xorm:"NOT NULL"` | |||
Creator int64 `xorm:"NOT NULL"` | |||
CreatedUnix timeutil.TimeStamp `xorm:"created"` | |||
DeletedAt timeutil.TimeStamp `xorm:"deleted"` | |||
} | |||
func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { | |||
func GetLimitConfigByLimitCode(limitCode string, limitType LimitType) ([]LimitConfig, error) { | |||
r := make([]LimitConfig, 0) | |||
err := x.Find(r, tl) | |||
err := x.Where("limit_code = ? and limit_type = ?", limitCode, limitType.Name()).Find(&r) | |||
if err != nil { | |||
return nil, err | |||
} else if len(r) == 0 { | |||
@@ -24,10 +43,3 @@ func findLimitConfig(tl *LimitConfig) ([]LimitConfig, error) { | |||
} | |||
return r, nil | |||
} | |||
func GetLimitConfigByLimitCode(limitCode string) ([]LimitConfig, error) { | |||
t := &LimitConfig{ | |||
LimitCode: limitCode, | |||
} | |||
return findLimitConfig(t) | |||
} |
@@ -144,6 +144,13 @@ func init() { | |||
new(WechatBindLog), | |||
new(OrgStatistic), | |||
new(SearchRecord), | |||
new(TaskConfig), | |||
new(TaskAccomplishLog), | |||
new(RewardOperateRecord), | |||
new(LimitConfig), | |||
new(PeriodicTask), | |||
new(PointAccountLog), | |||
new(PointAccount), | |||
) | |||
tablesStatistic = append(tablesStatistic, | |||
@@ -1,6 +1,8 @@ | |||
package models | |||
import "code.gitea.io/gitea/modules/timeutil" | |||
import ( | |||
"code.gitea.io/gitea/modules/timeutil" | |||
) | |||
type PointAccountStatus int | |||
@@ -87,7 +89,7 @@ func GetAccountByUserId(userId int64) (*PointAccount, error) { | |||
return nil, err | |||
} | |||
if !has { | |||
return nil, nil | |||
return nil, ErrRecordNotExist{} | |||
} | |||
return p, nil | |||
} | |||
@@ -287,7 +287,7 @@ func NotifyWatchers(actions ...*Action) error { | |||
func producer(actions ...*Action) { | |||
for _, action := range actions { | |||
if !action.IsPrivate{ | |||
if !action.IsPrivate { | |||
ActionChan <- action | |||
} | |||
} | |||
@@ -5,18 +5,12 @@ import ( | |||
"fmt" | |||
) | |||
type RewardSourceType string | |||
const ( | |||
SourceTypeAccomplishTask RewardSourceType = "ACCOMPLISH_TASK" | |||
SourceTypeAdminOperate RewardSourceType = "ADMIN_OPERATE" | |||
SourceTypeRunCloudbrainTask RewardSourceType = "RUN_CLOUBRAIN_TASK" | |||
SourceTypeAccomplishTask string = "ACCOMPLISH_TASK" | |||
SourceTypeAdminOperate = "ADMIN_OPERATE" | |||
SourceTypeRunCloudbrainTask = "RUN_CLOUBRAIN_TASK" | |||
) | |||
func (r *RewardSourceType) String() string { | |||
return fmt.Sprint(r) | |||
} | |||
type RewardType string | |||
const ( | |||
@@ -40,6 +34,7 @@ const ( | |||
type RewardOperateRecord struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
RecordId string `xorm:"INDEX NOT NULL"` | |||
UserId int64 `xorm:"INDEX NOT NULL"` | |||
Amount int64 `xorm:"NOT NULL"` | |||
RewardType string `xorm:"NOT NULL"` | |||
@@ -80,5 +75,21 @@ func UpdateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus | |||
r := &RewardOperateRecord{ | |||
Status: newStatus, | |||
} | |||
return x.Cols("status").Where("source_type=? and requestId=? and status=?", sourceType, requestId, oldStatus).Update(r) | |||
return x.Cols("status").Where("source_type=? and request_id=? and status=?", sourceType, requestId, oldStatus).Update(r) | |||
} | |||
type RewardOperateContext struct { | |||
SourceType string | |||
SourceId string | |||
Remark string | |||
Reward Reward | |||
TargetUserId int64 | |||
RequestId string | |||
OperateType string | |||
CycleIntervalSeconds int64 | |||
} | |||
type Reward struct { | |||
Amount int64 | |||
Type string | |||
} |
@@ -2,19 +2,13 @@ package models | |||
import ( | |||
"code.gitea.io/gitea/modules/timeutil" | |||
"fmt" | |||
) | |||
type TaskType string | |||
const ( | |||
TaskTypeComment TaskType = "COMMENT" | |||
TaskTypeCreateIssueComment string = "CREATE_IS" | |||
TaskTypeNewIssue = "NEW_ISSUE" | |||
) | |||
func (t *TaskType) String() string { | |||
return fmt.Sprint(t) | |||
} | |||
const ( | |||
PeriodNotCycle = "NOT_CYCLE" | |||
PeriodDaily = "DAILY" | |||
@@ -23,11 +17,9 @@ const ( | |||
//PointTaskConfig Only add and delete are allowed, edit is not allowed | |||
//so if you want to edit config for some task code,please delete first and add new one | |||
type TaskConfig struct { | |||
ID int64 `xorm:"pk autoincr"` | |||
TaskCode string `xorm:"NOT NULL"` | |||
Tittle string `xorm:"NOT NULL"` | |||
RefreshRate string `xorm:"NOT NULL"` | |||
Times int64 `xorm:"NOT NULL"` | |||
ID int64 `xorm:"pk autoincr"` | |||
TaskCode string `xorm:"NOT NULL"` | |||
Tittle string | |||
AwardType string `xorm:"NOT NULL"` | |||
AwardAmount int64 `xorm:"NOT NULL"` | |||
Creator int64 `xorm:"NOT NULL"` | |||
@@ -26,14 +26,18 @@ func GetWechatAccessToken() string { | |||
} | |||
func refreshAccessToken() { | |||
if ok := accessTokenLock.Lock(3 * time.Second); ok { | |||
if ok, _ := accessTokenLock.Lock(3 * time.Second); ok { | |||
defer accessTokenLock.UnLock() | |||
callAccessTokenAndUpdateCache() | |||
} | |||
} | |||
func refreshAndGetAccessToken() string { | |||
if ok := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second); ok { | |||
isOk, err := accessTokenLock.LockWithWait(3*time.Second, 3*time.Second) | |||
if err != nil { | |||
return "" | |||
} | |||
if isOk { | |||
defer accessTokenLock.UnLock() | |||
token, _ := redis_client.Get(redis_key.WechatAccessTokenKey()) | |||
if token != "" { | |||
@@ -10,6 +10,7 @@ import ( | |||
"code.gitea.io/gitea/modules/notification/base" | |||
"code.gitea.io/gitea/modules/notification/indexer" | |||
"code.gitea.io/gitea/modules/notification/mail" | |||
"code.gitea.io/gitea/modules/notification/task" | |||
"code.gitea.io/gitea/modules/notification/ui" | |||
"code.gitea.io/gitea/modules/notification/webhook" | |||
"code.gitea.io/gitea/modules/repository" | |||
@@ -35,6 +36,7 @@ func NewContext() { | |||
RegisterNotifier(indexer.NewNotifier()) | |||
RegisterNotifier(webhook.NewNotifier()) | |||
RegisterNotifier(action.NewNotifier()) | |||
RegisterNotifier(task.NewNotifier()) | |||
} | |||
// NotifyUploadAttachment notifies attachment upload message to notifiers | |||
@@ -0,0 +1,85 @@ | |||
package task | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/notification/base" | |||
"code.gitea.io/gitea/modules/repository" | |||
"code.gitea.io/gitea/services/task" | |||
"fmt" | |||
) | |||
type taskNotifier struct { | |||
base.NullNotifier | |||
} | |||
var ( | |||
_ base.Notifier = &taskNotifier{} | |||
) | |||
// NewNotifier create a new actionNotifier notifier | |||
func NewNotifier() base.Notifier { | |||
return &taskNotifier{} | |||
} | |||
func (t *taskNotifier) NotifyNewIssue(issue *models.Issue) { | |||
task.Accomplish(issue.Poster.ID, models.TaskTypeNewIssue, fmt.Sprint(issue.ID)) | |||
} | |||
// NotifyIssueChangeStatus notifies close or reopen issue to notifiers | |||
func (t *taskNotifier) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) { | |||
return | |||
} | |||
// NotifyCreateIssueComment notifies comment on an issue to notifiers | |||
func (t *taskNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository, | |||
issue *models.Issue, comment *models.Comment) { | |||
task.Accomplish(doer.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(comment.ID)) | |||
} | |||
func (t *taskNotifier) NotifyNewPullRequest(pull *models.PullRequest) { | |||
task.Accomplish(pull.Issue.Poster.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(pull.ID)) | |||
} | |||
func (t *taskNotifier) NotifyRenameRepository(doer *models.User, repo *models.Repository, oldRepoName string) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyAliasRepository(doer *models.User, repo *models.Repository, oldAlias string) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyTransferRepository(doer *models.User, repo *models.Repository, oldOwnerName string) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyCreateRepository(doer *models.User, u *models.User, repo *models.Repository) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyForkRepository(doer *models.User, oldRepo, repo *models.Repository) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifySyncPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifySyncCreateRef(doer *models.User, repo *models.Repository, refType, refFullName string) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) { | |||
return | |||
} | |||
func (t *taskNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) { | |||
return | |||
} |
@@ -103,7 +103,7 @@ func Expire(key string, expireSeconds int64) error { | |||
redisClient := labelmsg.Get() | |||
defer redisClient.Close() | |||
_, err := redisClient.Do("EXPIRE ", key, expireSeconds) | |||
_, err := redisClient.Do("EXPIRE", key, expireSeconds) | |||
if err != nil { | |||
return err | |||
} | |||
@@ -8,8 +8,8 @@ func PointAccountOperateLock(accountCode string) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, accountCode, "operate", "lock") | |||
} | |||
func PointAccountDetail(userId int64) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "detail") | |||
func PointAccountInfo(userId int64) string { | |||
return KeyJoin(ACCOUNT_REDIS_PREFIX, fmt.Sprint(userId), "info") | |||
} | |||
func PointAccountInitLock(userId int64) string { | |||
@@ -21,6 +21,6 @@ func LimitCount(userId int64, limitCode string, period *models.PeriodResult) str | |||
} | |||
func LimitConfig(limitCode string) string { | |||
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "config") | |||
func LimitConfig(limitCode string, limitType models.LimitType) string { | |||
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, limitType.Name(), "config") | |||
} |
@@ -1,15 +1,11 @@ | |||
package redis_key | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
) | |||
const TASK_REDIS_PREFIX = "task" | |||
func TaskAccomplishLock(sourceId string, taskType models.TaskType) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType.String(), "accomplish") | |||
func TaskAccomplishLock(sourceId string, taskType string) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, sourceId, taskType, "accomplish") | |||
} | |||
func TaskConfig(taskType models.TaskType) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, "config", taskType.String()) | |||
func TaskConfig(taskType string) string { | |||
return KeyJoin(TASK_REDIS_PREFIX, "config", taskType) | |||
} |
@@ -13,26 +13,32 @@ func NewDistributeLock(lockKey string) *DistributeLock { | |||
return &DistributeLock{lockKey: lockKey} | |||
} | |||
func (lock *DistributeLock) Lock(expireTime time.Duration) bool { | |||
isOk, _ := redis_client.Setnx(lock.lockKey, "", expireTime) | |||
return isOk | |||
func (lock *DistributeLock) Lock(expireTime time.Duration) (bool, error) { | |||
isOk, err := redis_client.Setnx(lock.lockKey, "", expireTime) | |||
if err != nil { | |||
return false, err | |||
} | |||
return isOk, nil | |||
} | |||
func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) bool { | |||
func (lock *DistributeLock) LockWithWait(expireTime time.Duration, waitTime time.Duration) (bool, error) { | |||
start := time.Now().Unix() * 1000 | |||
duration := waitTime.Milliseconds() | |||
for { | |||
isOk, _ := redis_client.Setnx(lock.lockKey, "", expireTime) | |||
isOk, err := redis_client.Setnx(lock.lockKey, "", expireTime) | |||
if err != nil { | |||
return false, err | |||
} | |||
if isOk { | |||
return true | |||
return true, nil | |||
} | |||
if time.Now().Unix()*1000-start > duration { | |||
return false | |||
return false, nil | |||
} | |||
time.Sleep(50 * time.Millisecond) | |||
} | |||
return false | |||
return false, nil | |||
} | |||
func (lock *DistributeLock) UnLock() error { | |||
@@ -2,6 +2,7 @@ package limiter | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/redis/redis_client" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/services/task/period" | |||
@@ -17,25 +18,27 @@ type limiterRunner struct { | |||
userId int64 | |||
amount int64 | |||
limitCode string | |||
limitType models.LimitType | |||
} | |||
func newLimiterRunner(limitCode string, userId, amount int64) *limiterRunner { | |||
func newLimiterRunner(limitCode string, limitType models.LimitType, userId, amount int64) *limiterRunner { | |||
return &limiterRunner{ | |||
userId: userId, | |||
amount: amount, | |||
limitCode: limitCode, | |||
limitType: limitType, | |||
index: 0, | |||
} | |||
} | |||
func (l *limiterRunner) Run() error { | |||
if err := l.LoadLimiters(l.limitCode); err != nil { | |||
if err := l.LoadLimiters(); err != nil { | |||
return err | |||
} | |||
//todo 验证未配置的情况 | |||
for l.index <= len(l.limiters) { | |||
for l.index < len(l.limiters) { | |||
err := l.limit(l.limiters[l.index]) | |||
if err != nil { | |||
log.Info("limiter check failed,%v", err) | |||
return err | |||
} | |||
l.index += 1 | |||
@@ -62,32 +65,43 @@ func (l *limiterRunner) limit(r models.LimitConfig) error { | |||
return nil | |||
} | |||
func (l *limiterRunner) LoadLimiters(limitCode string) error { | |||
redisKey := redis_key.LimitConfig(limitCode) | |||
func (l *limiterRunner) LoadLimiters() error { | |||
limiters, err := GetLimiters(l.limitCode, l.limitType) | |||
if err != nil { | |||
return err | |||
} | |||
if limiters != nil { | |||
l.limiters = limiters | |||
} | |||
return nil | |||
} | |||
func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64) error { | |||
r := newLimiterRunner(limitCode, limitType, userId, amount) | |||
return r.Run() | |||
} | |||
func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitConfig, error) { | |||
redisKey := redis_key.LimitConfig(limitCode, limitType) | |||
val, _ := redis_client.Get(redisKey) | |||
if val != "" { | |||
if val == redis_key.EMPTY_REDIS_VAL { | |||
return nil | |||
return nil, nil | |||
} | |||
limiters := make([]models.LimitConfig, 0) | |||
json.Unmarshal([]byte(val), limiters) | |||
return nil | |||
json.Unmarshal([]byte(val), &limiters) | |||
return limiters, nil | |||
} | |||
limiters, err := models.GetLimitConfigByLimitCode(limitCode) | |||
limiters, err := models.GetLimitConfigByLimitCode(limitCode, limitType) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) | |||
return nil | |||
return nil, nil | |||
} | |||
return err | |||
return nil, err | |||
} | |||
jsonStr, _ := json.Marshal(limiters) | |||
redis_client.Setex(redisKey, string(jsonStr), 30*24*time.Hour) | |||
return nil | |||
} | |||
func CheckLimit(limitCode string, userId, amount int64) error { | |||
r := newLimiterRunner(limitCode, userId, amount) | |||
return r.Run() | |||
return limiters, nil | |||
} |
@@ -5,6 +5,7 @@ import ( | |||
"code.gitea.io/gitea/modules/log" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/modules/util" | |||
"code.gitea.io/gitea/services/reward/point" | |||
"errors" | |||
"fmt" | |||
@@ -15,26 +16,25 @@ var RewardOperatorMap = map[string]RewardOperator{ | |||
fmt.Sprint(models.RewardTypePoint): new(point.PointOperator), | |||
} | |||
type RewardOperateContext struct { | |||
SourceType models.RewardSourceType | |||
SourceId string | |||
Remark string | |||
Reward Reward | |||
TargetUserId int64 | |||
RequestId string | |||
OperateType string | |||
CycleIntervalSeconds int64 | |||
} | |||
type RewardOperator interface { | |||
IsLimited(ctx RewardOperateContext) bool | |||
Operate(ctx RewardOperateContext) error | |||
IsLimited(ctx models.RewardOperateContext) bool | |||
Operate(ctx models.RewardOperateContext) error | |||
} | |||
func Send(ctx RewardOperateContext) error { | |||
func Send(ctx models.RewardOperateContext) error { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) | |||
log.Error("PANIC:%v", combinedErr) | |||
} | |||
}() | |||
//add lock | |||
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType.String())) | |||
if !rewardLock.Lock(3 * time.Second) { | |||
var rewardLock = redis_lock.NewDistributeLock(redis_key.RewardSendLock(ctx.RequestId, ctx.SourceType)) | |||
isOk, err := rewardLock.Lock(3 * time.Second) | |||
if err != nil { | |||
return err | |||
} | |||
if !isOk { | |||
log.Info("duplicated reward request,targetUserId=%d requestId=%s", ctx.TargetUserId, ctx.RequestId) | |||
return nil | |||
} | |||
@@ -63,19 +63,22 @@ func Send(ctx RewardOperateContext) error { | |||
} | |||
//new reward operate record | |||
if err := initAwardOperateRecord(ctx); err != nil { | |||
recordId, err := initAwardOperateRecord(ctx) | |||
if err != nil { | |||
return err | |||
} | |||
ctx.SourceId = recordId | |||
//operate | |||
if err := operator.Operate(ctx); err != nil { | |||
updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) | |||
updateAwardOperateRecordStatus(ctx.SourceType, ctx.RequestId, models.OperateStatusOperating, models.OperateStatusFailed) | |||
return err | |||
} | |||
//if not a cycle operate,update status to success | |||
if ctx.CycleIntervalSeconds > 0 { | |||
updateAwardOperateRecordStatus(ctx.SourceType.String(), ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) | |||
if ctx.CycleIntervalSeconds == 0 { | |||
updateAwardOperateRecordStatus(ctx.SourceType, ctx.RequestId, models.OperateStatusOperating, models.OperateStatusSucceeded) | |||
} | |||
return nil | |||
} | |||
@@ -84,8 +87,8 @@ func GetOperator(rewardType string) RewardOperator { | |||
return RewardOperatorMap[rewardType] | |||
} | |||
func isHandled(sourceType models.RewardSourceType, requestId string) (bool, error) { | |||
_, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType.String(), requestId) | |||
func isHandled(sourceType string, requestId string) (bool, error) { | |||
_, err := models.GetPointOperateRecordBySourceTypeAndRequestId(sourceType, requestId) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
return false, nil | |||
@@ -96,23 +99,25 @@ func isHandled(sourceType models.RewardSourceType, requestId string) (bool, erro | |||
} | |||
func initAwardOperateRecord(ctx RewardOperateContext) error { | |||
_, err := models.InsertAwardOperateRecord(&models.RewardOperateRecord{ | |||
func initAwardOperateRecord(ctx models.RewardOperateContext) (string, error) { | |||
record := &models.RewardOperateRecord{ | |||
RecordId: util.UUID(), | |||
UserId: ctx.TargetUserId, | |||
Amount: ctx.Reward.Amount, | |||
RewardType: ctx.Reward.Type, | |||
SourceType: ctx.SourceType.String(), | |||
SourceType: ctx.SourceType, | |||
SourceId: ctx.SourceId, | |||
RequestId: ctx.RequestId, | |||
OperateType: ctx.OperateType, | |||
CycleIntervalSeconds: ctx.CycleIntervalSeconds, | |||
Status: models.OperateStatusOperating, | |||
Remark: ctx.Remark, | |||
}) | |||
} | |||
_, err := models.InsertAwardOperateRecord(record) | |||
if err != nil { | |||
return err | |||
return "", err | |||
} | |||
return nil | |||
return record.RecordId, nil | |||
} | |||
func updateAwardOperateRecordStatus(sourceType, requestId, oldStatus, newStatus string) error { | |||
@@ -11,7 +11,7 @@ import ( | |||
) | |||
func GetAccount(userId int64) (*models.PointAccount, error) { | |||
redisKey := redis_key.PointAccountDetail(userId) | |||
redisKey := redis_key.PointAccountInfo(userId) | |||
val, _ := redis_client.Get(redisKey) | |||
if val != "" { | |||
account := &models.PointAccount{} | |||
@@ -36,7 +36,11 @@ func GetAccount(userId int64) (*models.PointAccount, error) { | |||
func InitAccount(userId int64) (*models.PointAccount, error) { | |||
lock := redis_lock.NewDistributeLock(redis_key.PointAccountInitLock(userId)) | |||
if lock.LockWithWait(3*time.Second, 3*time.Second) { | |||
isOk, err := lock.LockWithWait(3*time.Second, 3*time.Second) | |||
if err != nil { | |||
return nil, err | |||
} | |||
if isOk { | |||
defer lock.UnLock() | |||
account, _ := models.GetAccountByUserId(userId) | |||
if account == nil { | |||
@@ -2,9 +2,9 @@ package point | |||
import ( | |||
"code.gitea.io/gitea/models" | |||
"code.gitea.io/gitea/modules/redis/redis_client" | |||
"code.gitea.io/gitea/modules/redis/redis_key" | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/services/reward" | |||
"code.gitea.io/gitea/services/reward/limiter" | |||
"code.gitea.io/gitea/services/reward/point/account" | |||
"errors" | |||
@@ -14,28 +14,36 @@ import ( | |||
type PointOperator struct { | |||
} | |||
func (operator *PointOperator) IsLimited(ctx reward.RewardOperateContext) bool { | |||
if err := limiter.CheckLimit(ctx.Reward.Type, ctx.TargetUserId, ctx.Reward.Amount); err != nil { | |||
return false | |||
func (operator *PointOperator) IsLimited(ctx models.RewardOperateContext) bool { | |||
if err := limiter.CheckLimit(ctx.Reward.Type, models.LimitTypeReward, ctx.TargetUserId, ctx.Reward.Amount); err != nil { | |||
return true | |||
} | |||
return true | |||
return false | |||
} | |||
func (operator *PointOperator) Operate(ctx reward.RewardOperateContext) error { | |||
func (operator *PointOperator) Operate(ctx models.RewardOperateContext) error { | |||
a, err := account.GetAccount(ctx.TargetUserId) | |||
if err != nil || a == nil { | |||
return errors.New("get account error") | |||
} | |||
lock := redis_lock.NewDistributeLock(redis_key.PointAccountOperateLock(a.AccountCode)) | |||
if lock.LockWithWait(3*time.Second, 3*time.Second) { | |||
isOk, err := lock.LockWithWait(3*time.Second, 3*time.Second) | |||
if err != nil { | |||
return err | |||
} | |||
if isOk { | |||
defer lock.UnLock() | |||
na, _ := account.GetAccount(ctx.TargetUserId) | |||
if ctx.OperateType == models.OperateTypeIncrease { | |||
na.Increase(ctx.Reward.Amount, ctx.SourceId) | |||
err = na.Increase(ctx.Reward.Amount, ctx.SourceId) | |||
} else if ctx.OperateType == models.OperateTypeDecrease { | |||
na.Decrease(ctx.Reward.Amount, ctx.SourceId) | |||
err = na.Decrease(ctx.Reward.Amount, ctx.SourceId) | |||
} | |||
if err != nil { | |||
return err | |||
} | |||
redis_client.Del(redis_key.PointAccountInfo(ctx.TargetUserId)) | |||
} else { | |||
return errors.New("Get account operate lock failed") | |||
@@ -1,6 +0,0 @@ | |||
package reward | |||
type Reward struct { | |||
Amount int64 | |||
Type string | |||
} |
@@ -7,18 +7,30 @@ import ( | |||
"code.gitea.io/gitea/modules/redis/redis_lock" | |||
"code.gitea.io/gitea/modules/util" | |||
"code.gitea.io/gitea/services/reward" | |||
"code.gitea.io/gitea/services/task/period" | |||
"code.gitea.io/gitea/services/reward/limiter" | |||
"fmt" | |||
"time" | |||
) | |||
func Accomplish(userId int64, taskType models.TaskType, sourceId string) { | |||
func Accomplish(userId int64, taskType string, sourceId string) { | |||
go accomplish(userId, taskType, sourceId) | |||
} | |||
func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
func accomplish(userId int64, taskType string, sourceId string) error { | |||
defer func() { | |||
if err := recover(); err != nil { | |||
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2)) | |||
log.Error("PANIC:%v", combinedErr) | |||
} | |||
}() | |||
//lock | |||
var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(sourceId, taskType)) | |||
if !taskLock.Lock(3 * time.Second) { | |||
isOk, err := taskLock.Lock(3 * time.Second) | |||
if err != nil { | |||
log.Error("get taskLock error. %v", err) | |||
return err | |||
} | |||
if !isOk { | |||
log.Info("duplicated task request,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) | |||
return nil | |||
} | |||
@@ -47,12 +59,7 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
} | |||
//is limited? | |||
isLimited, err := IsLimited(userId, config) | |||
if err != nil { | |||
log.Error("get limited error,%v", err) | |||
return err | |||
} | |||
if isLimited { | |||
if isLimited(userId, config) { | |||
log.Info("task accomplish maximum times are reached,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId) | |||
return nil | |||
} | |||
@@ -71,22 +78,23 @@ func accomplish(userId int64, taskType models.TaskType, sourceId string) error { | |||
} | |||
//reward | |||
reward.Send(reward.RewardOperateContext{ | |||
reward.Send(models.RewardOperateContext{ | |||
SourceType: models.SourceTypeAccomplishTask, | |||
SourceId: sourceId, | |||
Reward: reward.Reward{ | |||
SourceId: logId, | |||
Reward: models.Reward{ | |||
Amount: config.AwardAmount, | |||
Type: config.AwardType, | |||
}, | |||
TargetUserId: userId, | |||
RequestId: logId, | |||
OperateType: models.OperateTypeIncrease, | |||
}) | |||
return nil | |||
} | |||
func isHandled(taskType models.TaskType, sourceId string) (bool, error) { | |||
_, err := models.GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskType.String()) | |||
func isHandled(taskType string, sourceId string) (bool, error) { | |||
_, err := models.GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskType) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
return false, nil | |||
@@ -97,15 +105,10 @@ func isHandled(taskType models.TaskType, sourceId string) (bool, error) { | |||
} | |||
func IsLimited(userId int64, config *models.TaskConfig) (bool, error) { | |||
p, err := period.GetPeriod(config.RefreshRate) | |||
if err != nil { | |||
return false, err | |||
} | |||
n, err := models.CountInTaskPeriod(config.ID, userId, p) | |||
if err != nil { | |||
return false, err | |||
func isLimited(userId int64, config *models.TaskConfig) bool { | |||
if err := limiter.CheckLimit(config.TaskCode, models.LimitTypeTask, userId, 1); err != nil { | |||
return true | |||
} | |||
return n >= config.Times, nil | |||
return false | |||
} |
@@ -10,7 +10,7 @@ import ( | |||
//GetTaskConfig get task config from redis cache first | |||
// if not exist in redis, find in db and refresh the redis key | |||
func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { | |||
func GetTaskConfig(taskType string) (*models.TaskConfig, error) { | |||
redisKey := redis_key.TaskConfig(taskType) | |||
configStr, _ := redis_client.Get(redisKey) | |||
if configStr != "" { | |||
@@ -21,7 +21,7 @@ func GetTaskConfig(taskType models.TaskType) (*models.TaskConfig, error) { | |||
json.Unmarshal([]byte(configStr), config) | |||
return config, nil | |||
} | |||
config, err := models.GetTaskConfigByTaskCode(taskType.String()) | |||
config, err := models.GetTaskConfigByTaskCode(taskType) | |||
if err != nil { | |||
if models.IsErrRecordNotExist(err) { | |||
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second) | |||