Browse Source

decompress

tags/v1.21.12.1
yuyuanshifu 5 years ago
parent
commit
12953de55a
8 changed files with 199 additions and 6 deletions
  1. +2
    -0
      main.go
  2. +23
    -5
      models/attachment.go
  3. +10
    -0
      modules/setting/setting.go
  4. +21
    -0
      modules/timer/timer.go
  5. +34
    -0
      modules/worker/task.go
  6. +29
    -0
      modules/worker/worker.go
  7. +76
    -1
      routers/repo/attachment.go
  8. +4
    -0
      routers/routes/routes.go

+ 2
- 0
main.go View File

@@ -21,6 +21,8 @@ import (
_ "code.gitea.io/gitea/modules/markup/csv"
_ "code.gitea.io/gitea/modules/markup/markdown"
_ "code.gitea.io/gitea/modules/markup/orgmode"
_ "code.gitea.io/gitea/modules/timer"
_ "code.gitea.io/gitea/modules/worker"

"github.com/urfave/cli"
)


+ 23
- 5
models/attachment.go View File

@@ -19,6 +19,13 @@ import (
"xorm.io/xorm"
)

const (
DecompressStateInit int32 = iota
DecompressStateDone
DecompressStateIng
DecompressStateFailed
)

// Attachment represent a attachment of issue/comment/release.
type Attachment struct {
ID int64 `xorm:"pk autoincr"`
@@ -32,6 +39,7 @@ type Attachment struct {
DownloadCount int64 `xorm:"DEFAULT 0"`
Size int64 `xorm:"DEFAULT 0"`
IsPrivate bool `xorm:"DEFAULT false"`
DecompressState int32 `xorm:"DEFAULT 0"`
CreatedUnix timeutil.TimeStamp `xorm:"created"`
}

@@ -134,9 +142,9 @@ func GetAttachmentByID(id int64) (*Attachment, error) {
}

func getAttachmentByID(e Engine, id int64) (*Attachment, error) {
attach := &Attachment{ID: id}
attach := new(Attachment)

if has, err := e.Get(attach); err != nil {
if has, err := e.Where("id = ?", id).Get(attach); err != nil {
return nil, err
} else if !has {
return nil, ErrAttachmentNotExist{ID: id, UUID: ""}
@@ -145,8 +153,8 @@ func getAttachmentByID(e Engine, id int64) (*Attachment, error) {
}

func getAttachmentByUUID(e Engine, uuid string) (*Attachment, error) {
attach := &Attachment{UUID: uuid}
has, err := e.Get(attach)
attach := new(Attachment)
has, err := e.Where("uuid = ?", uuid).Get(attach)
if err != nil {
return nil, err
} else if !has {
@@ -279,7 +287,7 @@ func updateAttachment(e Engine, atta *Attachment) error {
// Use uuid only if id is not set and uuid is set
sess = e.Where("uuid = ?", atta.UUID)
}
_, err := sess.Cols("name", "issue_id", "release_id", "comment_id", "download_count", "is_private").Update(atta)
_, err := sess.Cols("name", "issue_id", "release_id", "comment_id", "download_count", "is_private", "decompress_state").Update(atta)
return err
}

@@ -328,3 +336,13 @@ func InsertAttachment(attach *Attachment) (_ *Attachment, err error) {

return attach, nil
}

//GetUnDecompressAttachments query the attachments unDecompressed
func GetUnDecompressAttachments() ([]*Attachment, error) {
return getUnDecompressAttachments(x)
}

func getUnDecompressAttachments(e Engine) ([]*Attachment, error) {
attachments := make([]*Attachment, 0, 10)
return attachments, e.Where("decompress_state = ? and dataset_id != 0 and name like '%.zip'", DecompressStateInit).Find(&attachments)
}

+ 10
- 0
modules/setting/setting.go View File

@@ -418,6 +418,11 @@ var (
// UILocation is the location on the UI, so that we can display the time on UI.
// Currently only show the default time.Local, it could be added to app.ini after UI is ready
UILocation = time.Local

//Machinery config
Broker string
DefaultQueue string
ResultBackend string
)

// DateLang transforms standard language locale name to corresponding value in datetime plugin.
@@ -1074,6 +1079,11 @@ func NewContext() {
for _, reaction := range UI.Reactions {
UI.ReactionsMap[reaction] = true
}

sec = Cfg.Section("machinery")
Broker = sec.Key("BROKER").MustString("redis://localhost:6379")
DefaultQueue = sec.Key("DEFAULT_QUEUE").MustString("DecompressTasksQueue")
ResultBackend = sec.Key("RESULT_BACKEND").MustString("redis://localhost:6379")
}

func loadInternalToken(sec *ini.Section) string {


+ 21
- 0
modules/timer/timer.go View File

@@ -0,0 +1,21 @@
package timer

import (
"time"

"code.gitea.io/gitea/routers/repo"
)

const (
DecompressTimer = time.Minute * 10
)

func init() {
ticker := time.NewTicker(DecompressTimer)
go func() {
for {
<- ticker.C
repo.HandleUnDecompressAttachment()
}
} ()
}

+ 34
- 0
modules/worker/task.go View File

@@ -0,0 +1,34 @@
package worker

import (
"context"

"code.gitea.io/gitea/modules/log"

"github.com/RichardKnop/machinery/v1/tasks"
)

// 方法名
const (
DecompressTaskName = "Decompress"
)

func SendDecompressTask(ctx context.Context, uuid string) error{
args := []tasks.Arg{{Name: "uuid", Type: "string", Value: uuid}}
task, err := tasks.NewSignature(DecompressTaskName, args)
if err != nil {
log.Error("NewSignature failed:", err.Error())
return err
}

task.RetryCount = 3
_,err = AsyncTaskCenter.SendTaskWithContext(ctx, task)
if err != nil {
log.Error("SendTaskWithContext failed:", err.Error())
return err
}

log.Info("SendDecompressTask(%s) success", uuid)

return nil
}

+ 29
- 0
modules/worker/worker.go View File

@@ -0,0 +1,29 @@
package worker

import (
"github.com/RichardKnop/machinery/v1"
mchConf "github.com/RichardKnop/machinery/v1/config"
)

var (
AsyncTaskCenter *machinery.Server
)

func init() {
tc, err := NewTaskCenter()
if err != nil {
panic(err)
}
AsyncTaskCenter = tc
}

func NewTaskCenter() (*machinery.Server, error) {
cnf := &mchConf.Config{
Broker: "redis://localhost:6379",
DefaultQueue: "DecompressTasksQueue",
ResultBackend: "redis://localhost:6379",
}
// Create server instance
return machinery.NewServer(cnf)
}


+ 76
- 1
routers/repo/attachment.go View File

@@ -5,6 +5,7 @@
package repo

import (
contexExt "context"
"fmt"
"net/http"
"strconv"
@@ -16,10 +17,17 @@ import (
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/storage"
"code.gitea.io/gitea/modules/upload"
"code.gitea.io/gitea/modules/worker"

gouuid "github.com/satori/go.uuid"
)

const (
//result of decompress
DecompressSuccess = "0"
DecompressFailed = "1"
)

func RenderAttachmentSettings(ctx *context.Context) {
renderAttachmentSettings(ctx)
}
@@ -258,7 +266,7 @@ func AddAttachment(ctx *context.Context) {
return
}

_, err = models.InsertAttachment(&models.Attachment{
attachment, err := models.InsertAttachment(&models.Attachment{
UUID: uuid,
UploaderID: ctx.User.ID,
IsPrivate: true,
@@ -272,7 +280,74 @@ func AddAttachment(ctx *context.Context) {
return
}

if attachment.DatasetID != 0 {
if strings.HasSuffix(attachment.Name, ".zip") {
err = worker.SendDecompressTask(contexExt.Background(), uuid)
if err != nil {
log.Error("SendDecompressTask(%s) failed:%s", uuid, err.Error())
} else {
attachment.DecompressState = models.DecompressStateIng
err = models.UpdateAttachment(attachment)
if err != nil {
log.Error("UpdateAttachment state(%s) failed:%s", uuid, err.Error())
}
}
}
}

ctx.JSON(200, map[string]string{
"result_code": "0",
})
}

func UpdateAttachmentDecompressState(ctx *context.Context) {
uuid := ctx.Query("uuid")
result := ctx.Query("result")
attach, err := models.GetAttachmentByUUID(uuid)
if err != nil {
log.Error("GetAttachmentByUUID(%s) failed:%s", uuid, err.Error())
return
}

if result == DecompressSuccess {
attach.DecompressState = models.DecompressStateDone
} else if result == DecompressFailed {
attach.DecompressState = models.DecompressStateFailed
} else {
log.Error("result is error:", result)
return
}

err = models.UpdateAttachment(attach)
if err != nil {
log.Error("UpdateAttachment(%s) failed:%s", uuid, err.Error())
return
}

ctx.JSON(200, map[string]string{
"result_code": "0",
})
}

func HandleUnDecompressAttachment() {
attachs,err := models.GetUnDecompressAttachments()
if err != nil {
log.Error("GetUnDecompressAttachments failed:", err.Error())
return
}

for _,attach := range attachs {
err = worker.SendDecompressTask(contexExt.Background(), attach.UUID)
if err != nil {
log.Error("SendDecompressTask(%s) failed:%s", attach.UUID, err.Error())
} else {
attach.DecompressState = models.DecompressStateIng
err = models.UpdateAttachment(attach)
if err != nil {
log.Error("UpdateAttachment state(%s) failed:%s", attach.UUID, err.Error())
}
}
}

return
}

+ 4
- 0
routers/routes/routes.go View File

@@ -523,6 +523,10 @@ func RegisterRoutes(m *macaron.Macaron) {
m.Post("/private", repo.UpdatePublicAttachment)
}, reqSignIn)

m.Group("/attachments", func() {
m.Post("/decompress_done_notify", repo.UpdateAttachmentDecompressState)
})

m.Group("/:username", func() {
m.Post("/action/:action", user.Action)
}, reqSignIn)


Loading…
Cancel
Save