Browse Source

增加缓存移动功能

gitlink
Sydonian 2 years ago
parent
commit
72c1db7e46
7 changed files with 165 additions and 52 deletions
  1. +20
    -0
      pkgs/db/cache.go
  2. +1
    -0
      pkgs/db/db.go
  3. +108
    -0
      pkgs/mq/agent/cache.go
  4. +0
    -51
      pkgs/mq/agent/ipfs.go
  5. +1
    -1
      pkgs/mq/agent/server.go
  6. +33
    -0
      pkgs/mq/coordinator/cache.go
  7. +2
    -0
      pkgs/mq/coordinator/server.go

+ 20
- 0
pkgs/db/cache.go View File

@@ -50,6 +50,26 @@ func (*CacheDB) CreatePinned(ctx SQLContext, fileHash string, nodeID int64, prio
return err
}

func (*CacheDB) BatchCreatePinned(ctx SQLContext, fileHashes []string, nodeID int64, priority int) error {
var caches []model.Cache
var nowTime = time.Now()
for _, hash := range fileHashes {
caches = append(caches, model.Cache{
FileHash: hash,
NodeID: nodeID,
State: consts.CacheStatePinned,
CacheTime: nowTime,
Priority: priority,
})
}

_, err := sqlx.NamedExec(ctx, "insert into Cache(FileHash,NodeID,State,CacheTime,Priority) values(:FileHash,:NodeID,:State,:CacheTime,:Priority)"+
" on duplicate key update State=values(State), CacheTime=values(CacheTime), Priority=values(Priority)",
caches,
)
return err
}

// Create 创建一条Temp状态的缓存记录,如果已存在则不产生效果
func (*CacheDB) CreateTemp(ctx SQLContext, fileHash string, nodeID int64) error {
_, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, nodeID, consts.CacheStateTemp, time.Now())


+ 1
- 0
pkgs/db/db.go View File

@@ -17,6 +17,7 @@ type DB struct {
type SQLContext interface {
sqlx.Queryer
sqlx.Execer
sqlx.Ext
}

func NewDB(cfg *config.Config) (*DB, error) {


+ 108
- 0
pkgs/mq/agent/cache.go View File

@@ -0,0 +1,108 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type CacheService interface {
CheckCache(msg *CheckCache) (*CheckCacheResp, *mq.CodeMessage)

StartCacheMovePackage(msg *StartCacheMovePackage) (*StartCacheMovePackageResp, *mq.CodeMessage)
WaitCacheMovePackage(msg *WaitCacheMovePackage) (*WaitCacheMovePackageResp, *mq.CodeMessage)
}

// 检查节点上的IPFS
var _ = Register(CacheService.CheckCache)

const (
CHECK_IPFS_RESP_OP_DELETE_TEMP = "DeleteTemp"
CHECK_IPFS_RESP_OP_CREATE_TEMP = "CreateTemp"
)

type CheckCache struct {
IsComplete bool `json:"isComplete"`
Caches []model.Cache `json:"caches"`
}
type CheckCacheResp struct {
Entries []CheckIPFSRespEntry `json:"entries"`
}
type CheckIPFSRespEntry struct {
FileHash string `json:"fileHash"`
Operation string `json:"operation"`
}

func NewCheckCache(isComplete bool, caches []model.Cache) CheckCache {
return CheckCache{
IsComplete: isComplete,
Caches: caches,
}
}
func NewCheckCacheResp(entries []CheckIPFSRespEntry) CheckCacheResp {
return CheckCacheResp{
Entries: entries,
}
}
func NewCheckCacheRespEntry(fileHash string, op string) CheckIPFSRespEntry {
return CheckIPFSRespEntry{
FileHash: fileHash,
Operation: op,
}
}
func (client *Client) CheckCache(msg CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) {
return mq.Request[CheckCacheResp](client.rabbitCli, msg, opts...)
}

// 将Package的缓存移动到这个节点
var _ = Register(CacheService.StartCacheMovePackage)

type StartCacheMovePackage struct {
UserID int64 `json:"userID"`
PackageID int64 `json:"packageID"`
}
type StartCacheMovePackageResp struct {
TaskID string `json:"taskID"`
}

func NewStartCacheMovePackage(userID int64, packageID int64) StartCacheMovePackage {
return StartCacheMovePackage{
UserID: userID,
PackageID: packageID,
}
}
func NewStartCacheMovePackageResp(taskID string) StartCacheMovePackageResp {
return StartCacheMovePackageResp{
TaskID: taskID,
}
}
func (client *Client) StartCacheMovePackage(msg StartCacheMovePackage, opts ...mq.RequestOption) (*StartCacheMovePackageResp, error) {
return mq.Request[StartCacheMovePackageResp](client.rabbitCli, msg, opts...)
}

// 将Package的缓存移动到这个节点
var _ = Register(CacheService.WaitCacheMovePackage)

type WaitCacheMovePackage struct {
TaskID string `json:"taskID"`
WaitTimeoutMs int64 `json:"waitTimeout"`
}
type WaitCacheMovePackageResp struct {
IsComplete bool `json:"isComplete"`
Error string `json:"error"`
}

func NewWaitCacheMovePackage(taskID string, waitTimeoutMs int64) WaitCacheMovePackage {
return WaitCacheMovePackage{
TaskID: taskID,
WaitTimeoutMs: waitTimeoutMs,
}
}
func NewWaitCacheMovePackageResp(isComplete bool, err string) WaitCacheMovePackageResp {
return WaitCacheMovePackageResp{
IsComplete: isComplete,
Error: err,
}
}
func (client *Client) WaitCacheMovePackage(msg WaitCacheMovePackage, opts ...mq.RequestOption) (*WaitCacheMovePackageResp, error) {
return mq.Request[WaitCacheMovePackageResp](client.rabbitCli, msg, opts...)
}

+ 0
- 51
pkgs/mq/agent/ipfs.go View File

@@ -1,51 +0,0 @@
package agent

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
)

type IPFSService interface {
CheckIPFS(msg *CheckIPFS) (*CheckIPFSResp, *mq.CodeMessage)
}

// 检查节点上的IPFS
var _ = Register(IPFSService.CheckIPFS)

const (
CHECK_IPFS_RESP_OP_DELETE_TEMP = "DeleteTemp"
CHECK_IPFS_RESP_OP_CREATE_TEMP = "CreateTemp"
)

type CheckIPFS struct {
IsComplete bool `json:"isComplete"`
Caches []model.Cache `json:"caches"`
}
type CheckIPFSResp struct {
Entries []CheckIPFSRespEntry `json:"entries"`
}
type CheckIPFSRespEntry struct {
FileHash string `json:"fileHash"`
Operation string `json:"operation"`
}

func NewCheckIPFS(isComplete bool, caches []model.Cache) CheckIPFS {
return CheckIPFS{
IsComplete: isComplete,
Caches: caches,
}
}
func NewCheckIPFSResp(entries []CheckIPFSRespEntry) CheckIPFSResp {
return CheckIPFSResp{
Entries: entries,
}
}
func NewCheckIPFSRespEntry(fileHash string, op string) CheckIPFSRespEntry {
return CheckIPFSRespEntry{
FileHash: fileHash,
Operation: op,
}
}
func (client *Client) CheckIPFS(msg CheckIPFS, opts ...mq.RequestOption) (*CheckIPFSResp, error) {
return mq.Request[CheckIPFSResp](client.rabbitCli, msg, opts...)
}

+ 1
- 1
pkgs/mq/agent/server.go View File

@@ -10,7 +10,7 @@ type Service interface {

StorageService

IPFSService
CacheService

AgentService
}


+ 33
- 0
pkgs/mq/coordinator/cache.go View File

@@ -0,0 +1,33 @@
package coordinator

import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
)

type CacheService interface {
CachePackageMoved(msg *CachePackageMoved) (*CachePackageMovedResp, *mq.CodeMessage)
}

// Package的Object移动到了节点的Cache中
var _ = Register(CacheService.CachePackageMoved)

type CachePackageMoved struct {
PackageID int64 `json:"packageID"`
NodeID int64 `json:"nodeID"`
FileHashes []string `json:"fileHashes"`
}
type CachePackageMovedResp struct{}

func NewCachePackageMoved(packageID int64, nodeID int64, fileHashes []string) CachePackageMoved {
return CachePackageMoved{
PackageID: packageID,
NodeID: nodeID,
FileHashes: fileHashes,
}
}
func NewCachePackageMovedResp() CachePackageMovedResp {
return CachePackageMovedResp{}
}
func (client *Client) CachePackageMoved(msg CachePackageMoved) (*CachePackageMovedResp, error) {
return mq.Request[CachePackageMovedResp](client.rabbitCli, msg)
}

+ 2
- 0
pkgs/mq/coordinator/server.go View File

@@ -11,6 +11,8 @@ type Service interface {

BucketService

CacheService

CommonService

NodeService


Loading…
Cancel
Save