Browse Source

#1249

update limiters
tags/v1.22.9.2^2
chenyifan01 3 years ago
parent
commit
c5a35c5982
14 changed files with 185 additions and 102 deletions
  1. +28
    -10
      models/limit_config.go
  2. +7
    -3
      models/reward_operate_record.go
  3. +1
    -10
      models/task_accomplish_log.go
  4. +18
    -2
      models/task_config.go
  5. +4
    -0
      modules/notification/action/action.go
  6. +1
    -0
      modules/notification/base/notifier.go
  7. +4
    -0
      modules/notification/base/null.go
  8. +7
    -0
      modules/notification/notification.go
  9. +37
    -15
      modules/notification/task/task.go
  10. +8
    -8
      modules/redis/redis_key/limit_redis_key.go
  11. +63
    -8
      services/reward/limiter/limiter.go
  12. +1
    -1
      services/reward/operator.go
  13. +1
    -1
      services/reward/point/point_operate.go
  14. +5
    -44
      services/task/task.go

+ 28
- 10
models/limit_config.go View File

@@ -5,16 +5,34 @@ import "code.gitea.io/gitea/modules/timeutil"
type LimitType string

const (
LimitTypeTask LimitType = "TASK"
LimitTypeReward LimitType = "REWARD"
LimitTypeTask LimitType = "TASK"
LimitTypeRewardPoint LimitType = "REWARD_POINT"
)

func (l LimitType) Name() string {
switch l {
case LimitTypeTask:
return "TASK"
case LimitTypeReward:
return "REWARD"
case LimitTypeRewardPoint:
return "REWARD_POINT"
default:
return ""
}
}

type LimitScope string

const (
LimitScopeAllUsers LimitScope = "ALL_USERS"
LimitScopeSingleUser LimitScope = "SINGLE_USER"
)

func (l LimitScope) Name() string {
switch l {
case LimitScopeAllUsers:
return "ALL_USERS"
case LimitScopeSingleUser:
return "SINGLE_USER"
default:
return ""
}
@@ -23,19 +41,19 @@ func (l LimitType) Name() string {
type LimitConfig struct {
ID int64 `xorm:"pk autoincr"`
Tittle string
RefreshRate string `xorm:"NOT NULL"`
Scope string `xorm:"NOT NULL"`
LimitNum int64 `xorm:"NOT NULL"`
LimitCode string `xorm:"NOT NULL"`
RefreshRate string `xorm:"NOT NULL"`
Scope string `xorm:"NOT NULL"`
LimitNum int64 `xorm:"NOT NULL"`
LimitCode string
LimitType string `xorm:"NOT NULL"`
Creator int64 `xorm:"NOT NULL"`
CreatedUnix timeutil.TimeStamp `xorm:"created"`
DeletedAt timeutil.TimeStamp `xorm:"deleted"`
}

func GetLimitConfigByLimitCode(limitCode string, limitType LimitType) ([]LimitConfig, error) {
func GetLimitConfigByLimitType(limitType LimitType) ([]LimitConfig, error) {
r := make([]LimitConfig, 0)
err := x.Where("limit_code = ? and limit_type = ?", limitCode, limitType.Name()).Find(&r)
err := x.Where(" limit_type = ?", limitType.Name()).Find(&r)
if err != nil {
return nil, err
} else if len(r) == 0 {


+ 7
- 3
models/reward_operate_record.go View File

@@ -2,7 +2,6 @@ package models

import (
"code.gitea.io/gitea/modules/timeutil"
"fmt"
)

const (
@@ -17,8 +16,13 @@ const (
RewardTypePoint RewardType = "POINT"
)

func (r *RewardType) String() string {
return fmt.Sprint(r)
func (r RewardType) Name() string {
switch r {
case RewardTypePoint:
return "POINT"
default:
return ""
}
}

const (


+ 1
- 10
models/task_accomplish_log.go View File

@@ -11,7 +11,6 @@ type TaskAccomplishLog struct {
ConfigId int64 `xorm:"NOT NULL"`
TaskCode string `xorm:"NOT NULL"`
UserId int64 `xorm:"INDEX NOT NULL"`
SourceId string `xorm:"INDEX NOT NULL"`
CreatedUnix timeutil.TimeStamp `xorm:"INDEX created"`
}

@@ -31,15 +30,7 @@ func getTaskAccomplishLog(tl *TaskAccomplishLog) (*TaskAccomplishLog, error) {
return tl, nil
}

func GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskCode string) (*TaskAccomplishLog, error) {
t := &TaskAccomplishLog{
SourceId: sourceId,
TaskCode: taskCode,
}
return getTaskAccomplishLog(t)
}

func CountInTaskPeriod(configId int64, userId int64, period *PeriodResult) (int64, error) {
func CountTaskAccomplishLogInTaskPeriod(configId int64, userId int64, period *PeriodResult) (int64, error) {
if period == nil {
return x.Where("config_id = ? and user_id = ?", configId, userId).Count(&TaskAccomplishLog{})
} else {


+ 18
- 2
models/task_config.go View File

@@ -5,8 +5,24 @@ import (
)

const (
TaskTypeCreateIssueComment string = "CREATE_IS"
TaskTypeNewIssue = "NEW_ISSUE"
TaskTypeNewIssue = "NEW_ISSUE"
TaskTypeIssueChangeStatus = "ISSUE_CHANGE_STATUS"
TaskTypeCreateIssueComment = "CREATE_ISSUE_COMMENT"
TaskTypeNewPullRequest = "NEW_PULL_REQUEST"
TaskTypeRenameRepository = "RENAME_REPOSITORY"
TaskTypeAliasRepository = "ALIAS_REPOSITORY"
TaskTypeTransferRepository = "TRANSFER_REPOSITORY"
TaskTypeCreateRepository = "CREATE_REPOSITORY"
TaskTypeForkRepository = "FORK_REPOSITORY"
TaskTypePullRequestReview = "PULL_REQUEST_REVIEW"
TaskTypeCommentPull = "COMMENT_PULL"
TaskTypeApprovePullRequest = "APPROVE_PULL_REQUEST"
TaskTypeRejectPullRequest = "REJECT_PULL_REQUEST"
TaskTypeMergePullRequest = "MERGE_PULL_REQUEST"
TaskTypeSyncPushCommits = "SYNC_PUSH_COMMITS"
TaskTypeSyncCreateRef = "SYNC_CREATE_REF"
TaskTypeSyncDeleteRef = "SYNC_DELETE_REF"
TaskTypeBindWechat = "BIND_WECHAT"
)

const (


+ 4
- 0
modules/notification/action/action.go View File

@@ -345,3 +345,7 @@ func (a *actionNotifier) NotifyOtherTask(doer *models.User, repo *models.Reposit
log.Error("notifyWatchers: %v", err)
}
}

func (a *actionNotifier) NotifyWechatBind(doer *models.User) {
return
}

+ 1
- 0
modules/notification/base/notifier.go View File

@@ -56,4 +56,5 @@ type Notifier interface {
NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string)

NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType)
NotifyWechatBind(doer *models.User)
}

+ 4
- 0
modules/notification/base/null.go View File

@@ -158,3 +158,7 @@ func (*NullNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Reposit
func (*NullNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) {

}

func (*NullNotifier) NotifyWechatBind(doer *models.User) {

}

+ 7
- 0
modules/notification/notification.go View File

@@ -271,3 +271,10 @@ func NotifySyncDeleteRef(pusher *models.User, repo *models.Repository, refType,
notifier.NotifySyncDeleteRef(pusher, repo, refType, refFullName)
}
}

// NotifyWechatBind notifies wechat bind
func NotifyWechatBind(doer *models.User) {
for _, notifier := range notifiers {
notifier.NotifyWechatBind(doer)
}
}

+ 37
- 15
modules/notification/task/task.go View File

@@ -5,7 +5,7 @@ import (
"code.gitea.io/gitea/modules/notification/base"
"code.gitea.io/gitea/modules/repository"
"code.gitea.io/gitea/services/task"
"fmt"
"strings"
)

type taskNotifier struct {
@@ -22,64 +22,86 @@ func NewNotifier() base.Notifier {
}

func (t *taskNotifier) NotifyNewIssue(issue *models.Issue) {
task.Accomplish(issue.Poster.ID, models.TaskTypeNewIssue, fmt.Sprint(issue.ID))
task.Accomplish(issue.Poster.ID, models.TaskTypeNewIssue)
}

// NotifyIssueChangeStatus notifies close or reopen issue to notifiers
func (t *taskNotifier) NotifyIssueChangeStatus(doer *models.User, issue *models.Issue, actionComment *models.Comment, closeOrReopen bool) {
return
task.Accomplish(doer.ID, models.TaskTypeIssueChangeStatus)
}

// NotifyCreateIssueComment notifies comment on an issue to notifiers
func (t *taskNotifier) NotifyCreateIssueComment(doer *models.User, repo *models.Repository,
issue *models.Issue, comment *models.Comment) {
task.Accomplish(doer.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(comment.ID))
task.Accomplish(doer.ID, models.TaskTypeCreateIssueComment)
}

func (t *taskNotifier) NotifyNewPullRequest(pull *models.PullRequest) {
task.Accomplish(pull.Issue.Poster.ID, models.TaskTypeCreateIssueComment, fmt.Sprint(pull.ID))
task.Accomplish(pull.Issue.Poster.ID, models.TaskTypeNewPullRequest)
}

func (t *taskNotifier) NotifyRenameRepository(doer *models.User, repo *models.Repository, oldRepoName string) {
return
task.Accomplish(doer.ID, models.TaskTypeRenameRepository)
}

func (t *taskNotifier) NotifyAliasRepository(doer *models.User, repo *models.Repository, oldAlias string) {
return
task.Accomplish(doer.ID, models.TaskTypeAliasRepository)
}

func (t *taskNotifier) NotifyTransferRepository(doer *models.User, repo *models.Repository, oldOwnerName string) {
return
task.Accomplish(doer.ID, models.TaskTypeTransferRepository)
}

func (t *taskNotifier) NotifyCreateRepository(doer *models.User, u *models.User, repo *models.Repository) {
return
task.Accomplish(doer.ID, models.TaskTypeCreateRepository)
}

func (t *taskNotifier) NotifyForkRepository(doer *models.User, oldRepo, repo *models.Repository) {
return
task.Accomplish(doer.ID, models.TaskTypeForkRepository)
}

func (t *taskNotifier) NotifyPullRequestReview(pr *models.PullRequest, review *models.Review, comment *models.Comment) {
return
for _, lines := range review.CodeComments {
for _, comments := range lines {
for _, _ = range comments {
task.Accomplish(review.Reviewer.ID, models.TaskTypePullRequestReview)
}
}
}
if review.Type != models.ReviewTypeComment || strings.TrimSpace(comment.Content) != "" {

switch review.Type {
case models.ReviewTypeApprove:
task.Accomplish(review.Reviewer.ID, models.TaskTypeApprovePullRequest)
case models.ReviewTypeReject:
task.Accomplish(review.Reviewer.ID, models.TaskTypeRejectPullRequest)
default:
task.Accomplish(review.Reviewer.ID, models.TaskTypeCommentPull)
}

}
}

func (t *taskNotifier) NotifyMergePullRequest(pr *models.PullRequest, doer *models.User) {
return
task.Accomplish(doer.ID, models.TaskTypeMergePullRequest)
}

func (t *taskNotifier) NotifySyncPushCommits(pusher *models.User, repo *models.Repository, refName, oldCommitID, newCommitID string, commits *repository.PushCommits) {
return
task.Accomplish(repo.OwnerID, models.TaskTypeSyncPushCommits)
}

func (t *taskNotifier) NotifySyncCreateRef(doer *models.User, repo *models.Repository, refType, refFullName string) {
return
task.Accomplish(repo.OwnerID, models.TaskTypeSyncCreateRef)
}

func (t *taskNotifier) NotifySyncDeleteRef(doer *models.User, repo *models.Repository, refType, refFullName string) {
return
task.Accomplish(repo.OwnerID, models.TaskTypeSyncDeleteRef)
}

func (t *taskNotifier) NotifyOtherTask(doer *models.User, repo *models.Repository, id string, name string, optype models.ActionType) {
return
}

func (t *taskNotifier) NotifyWechatBind(doer *models.User) {
task.Accomplish(doer.ID, models.TaskTypeSyncDeleteRef)
}

+ 8
- 8
modules/redis/redis_key/limit_redis_key.go View File

@@ -7,20 +7,20 @@ import (

const LIMIT_REDIS_PREFIX = "limit"

func LimitCount(userId int64, limitCode string, period *models.PeriodResult) string {
if userId == 0 {
func LimitCount(userId int64, limitCode string, limitType string, scope string, period *models.PeriodResult) string {
if scope == models.LimitScopeAllUsers.Name() {
if period == nil {
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, "count")
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, limitType, "count")
}
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count")
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, limitType, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count")
}
if period == nil {
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, "count")
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, limitType, "count")
}
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count")
return KeyJoin(LIMIT_REDIS_PREFIX, "uid", fmt.Sprint(userId), limitCode, limitType, fmt.Sprint(period.StartTime.Unix()), fmt.Sprint(period.EndTime.Unix()), "count")

}

func LimitConfig(limitCode string, limitType models.LimitType) string {
return KeyJoin(LIMIT_REDIS_PREFIX, limitCode, limitType.Name(), "config")
func LimitConfig(limitType models.LimitType) string {
return KeyJoin(LIMIT_REDIS_PREFIX, limitType.Name(), "config")
}

+ 63
- 8
services/reward/limiter/limiter.go View File

@@ -39,6 +39,7 @@ func (l *limiterRunner) Run() error {
err := l.limit(l.limiters[l.index])
if err != nil {
log.Info("limiter check failed,%v", err)
l.Rollback(l.index)
return err
}
l.index += 1
@@ -46,20 +47,50 @@ func (l *limiterRunner) Run() error {
return nil
}

//Rollback rollback the usedNum from limiters[0] to limiters[index]
func (l *limiterRunner) Rollback(index int) error {
for i := index; i >= 0; i-- {
l.rollback(l.limiters[i])
}
return nil
}

func (l *limiterRunner) rollback(r models.LimitConfig) error {
p, err := period.GetPeriod(r.RefreshRate)
if err != nil {
return err
}
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, r.LimitType, r.Scope, p)
redis_client.IncrBy(redisKey, -1*l.amount)
return nil
}

func (l *limiterRunner) limit(r models.LimitConfig) error {
p, err := period.GetPeriod(r.RefreshRate)
if err != nil {
return err
}
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, p)
redisKey := redis_key.LimitCount(l.userId, r.LimitCode, r.LimitType, r.Scope, p)
usedNum, err := redis_client.IncrBy(redisKey, l.amount)
//if it is the first time,set expire time
if usedNum == l.amount && p != nil {
//todo 验证浮点精确度
redis_client.Expire(redisKey, int64(p.LeftTime.Seconds()))
if err != nil {
return err
}
//if usedNum equals amount,it is the first operation in period or redis cache deleted
//count in database to distinguish the two cases
if usedNum == l.amount {
n, err := l.countInPeriod(r, p)
if err != nil {
return err
}
if n > 0 {
//means redis cache deleted,incr the cache with real value
usedNum, err = redis_client.IncrBy(redisKey, n)
}
if p != nil {
redis_client.Expire(redisKey, int64(p.LeftTime.Seconds()))
}
}
if usedNum > r.LimitNum {
redis_client.IncrBy(redisKey, -1*l.amount)
return errors.New(fmt.Sprintf("%s:over limit", r.Tittle))
}
return nil
@@ -76,13 +107,37 @@ func (l *limiterRunner) LoadLimiters() error {
return nil
}

func (l *limiterRunner) countInPeriod(r models.LimitConfig, p *models.PeriodResult) (int64, error) {
switch r.LimitType {
case models.LimitTypeTask.Name():
return models.CountTaskAccomplishLogInTaskPeriod(r.ID, l.userId, p)
default:
return 0, nil

}
}

func CheckLimit(limitCode string, limitType models.LimitType, userId, amount int64) error {
r := newLimiterRunner(limitCode, limitType, userId, amount)
return r.Run()
}

func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitConfig, error) {
redisKey := redis_key.LimitConfig(limitCode, limitType)
limiters, err := GetLimitersByLimitType(limitType)
if err != nil {
return nil, err
}
result := make([]models.LimitConfig, 0)
for i, v := range limiters {
if v.LimitCode == "" || v.LimitCode == limitCode {
result = append(result, limiters[i])
}
}
return result, nil
}

func GetLimitersByLimitType(limitType models.LimitType) ([]models.LimitConfig, error) {
redisKey := redis_key.LimitConfig(limitType)
val, _ := redis_client.Get(redisKey)
if val != "" {
if val == redis_key.EMPTY_REDIS_VAL {
@@ -92,7 +147,7 @@ func GetLimiters(limitCode string, limitType models.LimitType) ([]models.LimitCo
json.Unmarshal([]byte(val), &limiters)
return limiters, nil
}
limiters, err := models.GetLimitConfigByLimitCode(limitCode, limitType)
limiters, err := models.GetLimitConfigByLimitType(limitType)
if err != nil {
if models.IsErrRecordNotExist(err) {
redis_client.Setex(redisKey, redis_key.EMPTY_REDIS_VAL, 5*time.Second)


+ 1
- 1
services/reward/operator.go View File

@@ -58,7 +58,7 @@ func Send(ctx models.RewardOperateContext) error {
}

//is limited?
if operator.IsLimited(ctx) {
if isLimited := operator.IsLimited(ctx); isLimited {
return nil
}



+ 1
- 1
services/reward/point/point_operate.go View File

@@ -15,7 +15,7 @@ type PointOperator struct {
}

func (operator *PointOperator) IsLimited(ctx models.RewardOperateContext) bool {
if err := limiter.CheckLimit(ctx.Reward.Type, models.LimitTypeReward, ctx.TargetUserId, ctx.Reward.Amount); err != nil {
if err := limiter.CheckLimit(ctx.SourceType, models.LimitTypeRewardPoint, ctx.TargetUserId, ctx.Reward.Amount); err != nil {
return true
}
return false


+ 5
- 44
services/task/task.go View File

@@ -3,49 +3,23 @@ package task
import (
"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/redis/redis_key"
"code.gitea.io/gitea/modules/redis/redis_lock"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/services/reward"
"code.gitea.io/gitea/services/reward/limiter"
"fmt"
"time"
)

func Accomplish(userId int64, taskType string, sourceId string) {
go accomplish(userId, taskType, sourceId)
func Accomplish(userId int64, taskType string) {
go accomplish(userId, taskType)
}

func accomplish(userId int64, taskType string, sourceId string) error {
func accomplish(userId int64, taskType string) error {
defer func() {
if err := recover(); err != nil {
combinedErr := fmt.Errorf("%s\n%s", err, log.Stack(2))
log.Error("PANIC:%v", combinedErr)
}
}()
//lock
var taskLock = redis_lock.NewDistributeLock(redis_key.TaskAccomplishLock(sourceId, taskType))
isOk, err := taskLock.Lock(3 * time.Second)
if err != nil {
log.Error("get taskLock error. %v", err)
return err
}
if !isOk {
log.Info("duplicated task request,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId)
return nil
}
defer taskLock.UnLock()

//is handled before?
isHandled, err := isHandled(taskType, sourceId)
if err != nil {
log.Error("Get isHandled error,%v", err)
return err
}
if isHandled {
log.Info("task has been handled,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId)
return nil
}

//get task config
config, err := GetTaskConfig(taskType)
@@ -54,13 +28,13 @@ func accomplish(userId int64, taskType string, sourceId string) error {
return err
}
if config == nil {
log.Info("task config not exist,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId)
log.Info("task config not exist,userId=%d taskType=%s", userId, taskType)
return nil
}

//is limited?
if isLimited(userId, config) {
log.Info("task accomplish maximum times are reached,userId=%d taskType=%s sourceId=%s", userId, taskType, sourceId)
log.Info("task accomplish maximum times are reached,userId=%d taskType=%s", userId, taskType)
return nil
}

@@ -71,7 +45,6 @@ func accomplish(userId int64, taskType string, sourceId string) error {
ConfigId: config.ID,
TaskCode: config.TaskCode,
UserId: userId,
SourceId: sourceId,
})
if err != nil {
return err
@@ -93,18 +66,6 @@ func accomplish(userId int64, taskType string, sourceId string) error {
return nil
}

func isHandled(taskType string, sourceId string) (bool, error) {
_, err := models.GetTaskAccomplishLogBySourceIdAndTaskCode(sourceId, taskType)
if err != nil {
if models.IsErrRecordNotExist(err) {
return false, nil
}
return false, err
}
return true, nil

}

func isLimited(userId int64, config *models.TaskConfig) bool {
if err := limiter.CheckLimit(config.TaskCode, models.LimitTypeTask, userId, 1); err != nil {
return true


Loading…
Cancel
Save