Browse Source

修复coordinator端代码

gitlink
Sydonian 7 months ago
parent
commit
e779120618
25 changed files with 495 additions and 1921 deletions
  1. +6
    -19
      coordinator/internal/cmd/migrate.go
  2. +10
    -11
      coordinator/internal/cmd/serve.go
  3. +1
    -1
      coordinator/internal/config/config.go
  4. +21
    -0
      coordinator/internal/db/config.go
  5. +77
    -0
      coordinator/internal/db/db.go
  6. +48
    -0
      coordinator/internal/db/hub.go
  7. +36
    -0
      coordinator/internal/db/hub_connectivity.go
  8. +36
    -0
      coordinator/internal/db/location.go
  9. +43
    -0
      coordinator/internal/db/storage.go
  10. +44
    -0
      coordinator/internal/db/union_serializer.go
  11. +44
    -0
      coordinator/internal/db/user.go
  12. +0
    -1
      coordinator/internal/mq/agent.go
  13. +0
    -143
      coordinator/internal/mq/bucket.go
  14. +0
    -66
      coordinator/internal/mq/cache.go
  15. +6
    -87
      coordinator/internal/mq/hub.go
  16. +0
    -891
      coordinator/internal/mq/object.go
  17. +0
    -337
      coordinator/internal/mq/package.go
  18. +4
    -7
      coordinator/internal/mq/service.go
  19. +1
    -123
      coordinator/internal/mq/storage.go
  20. +0
    -52
      coordinator/internal/mq/temp.go
  21. +0
    -127
      coordinator/internal/mq/user.go
  22. +0
    -23
      coordinator/internal/mq/utils.go
  23. +6
    -33
      coordinator/types/storage.go
  24. +92
    -0
      coordinator/types/storage_credential.go
  25. +20
    -0
      coordinator/types/types.go

+ 6
- 19
coordinator/internal/cmd/migrate.go View File

@@ -5,10 +5,8 @@ import (
"os" "os"


"github.com/spf13/cobra" "github.com/spf13/cobra"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gitlink.org.cn/cloudream/storage2/coordinator/internal/config" "gitlink.org.cn/cloudream/storage2/coordinator/internal/config"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
) )
@@ -42,22 +40,11 @@ func migrate(configPath string) {
} }
db = db.Set("gorm:table_options", "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci") db = db.Set("gorm:table_options", "CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci")


migrateOne(db, cdssdk.Bucket{})
migrateOne(db, model.Cache{})
migrateOne(db, model.Location{})
migrateOne(db, model.HubConnectivity{})
migrateOne(db, cdssdk.Hub{})
migrateOne(db, stgmod.ObjectAccessStat{})
migrateOne(db, stgmod.ObjectBlock{})
migrateOne(db, cdssdk.Object{})
migrateOne(db, stgmod.PackageAccessStat{})
migrateOne(db, cdssdk.Package{})
migrateOne(db, cdssdk.PinnedObject{})
migrateOne(db, cdssdk.Storage{})
migrateOne(db, model.UserStorage{})
migrateOne(db, model.UserBucket{})
migrateOne(db, cdssdk.User{})
migrateOne(db, model.UserHub{})
migrateOne(db, cortypes.HubConnectivity{})
migrateOne(db, cortypes.Hub{})
migrateOne(db, cortypes.Location{})
migrateOne(db, cortypes.Storage{})
migrateOne(db, cortypes.User{})


fmt.Println("migrate success") fmt.Println("migrate success")
} }


+ 10
- 11
coordinator/internal/cmd/serve.go View File

@@ -7,11 +7,10 @@ import (


"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent" "gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
"gitlink.org.cn/cloudream/storage2/coordinator/internal/config" "gitlink.org.cn/cloudream/storage2/coordinator/internal/config"
"gitlink.org.cn/cloudream/storage2/coordinator/internal/db"
mymq "gitlink.org.cn/cloudream/storage2/coordinator/internal/mq" mymq "gitlink.org.cn/cloudream/storage2/coordinator/internal/mq"
) )


@@ -28,20 +27,20 @@ func serve(configPath string) {
os.Exit(1) os.Exit(1)
} }


db2, err := db2.NewDB(&config.Cfg().DB)
db2, err := db.NewDB(&config.Cfg().DB)
if err != nil { if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error()) logger.Fatalf("new db2 failed, err: %s", err.Error())
} }


// 初始化系统事件发布器 // 初始化系统事件发布器
evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceCoordinator{})
if err != nil {
logger.Errorf("new sysevent publisher: %v", err)
os.Exit(1)
}
go servePublisher(evtPub)
coorSvr, err := coormq.NewServer(mymq.NewService(db2, evtPub), config.Cfg().RabbitMQ)
// evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &cortypes.SourceCoordinator{})
// if err != nil {
// logger.Errorf("new sysevent publisher: %v", err)
// os.Exit(1)
// }
// go servePublisher(evtPub)
coorSvr, err := coormq.NewServer(mymq.NewService(db2), config.Cfg().RabbitMQ)
if err != nil { if err != nil {
logger.Fatalf("new coordinator server failed, err: %s", err.Error()) logger.Fatalf("new coordinator server failed, err: %s", err.Error())
} }


+ 1
- 1
coordinator/internal/config/config.go View File

@@ -4,7 +4,7 @@ import (
log "gitlink.org.cn/cloudream/common/pkgs/logger" log "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
c "gitlink.org.cn/cloudream/common/utils/config" c "gitlink.org.cn/cloudream/common/utils/config"
db "gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config"
"gitlink.org.cn/cloudream/storage2/coordinator/internal/db"
) )


type Config struct { type Config struct {


+ 21
- 0
coordinator/internal/db/config.go View File

@@ -0,0 +1,21 @@
package db

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
DatabaseName string `json:"databaseName"`
}

func (cfg *Config) MakeSourceString() string {
return fmt.Sprintf(
"%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s",
cfg.Account,
cfg.Password,
cfg.Address,
cfg.DatabaseName,
"Asia%2FShanghai",
)
}

+ 77
- 0
coordinator/internal/db/db.go View File

@@ -0,0 +1,77 @@
package db

import (
_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

type DB struct {
db *gorm.DB
}

func NewDB(cfg *Config) (*DB, error) {
mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{})
if err != nil {
logrus.Fatalf("failed to connect to database: %v", err)
}

return &DB{
db: mydb,
}, nil
}

func (db *DB) DoTx(do func(tx SQLContext) error) error {
return db.db.Transaction(func(tx *gorm.DB) error {
return do(SQLContext{tx})
})
}

func DoTx02[R any](db *DB, do func(tx SQLContext) (R, error)) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx})
return err
})
return ret, err
}

func DoTx12[T any, R any](db *DB, do func(tx SQLContext, t T) (R, error), t T) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t)
return err
})
return ret, err
}

func DoTx22[T1 any, T2 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2) (R, error), t1 T1, t2 T2) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t1, t2)
return err
})
return ret, err
}

func DoTx32[T1 any, T2 any, T3 any, R any](db *DB, do func(tx SQLContext, t1 T1, t2 T2, t3 T3) (R, error), t1 T1, t2 T2, t3 T3) (R, error) {
var ret R
err := db.db.Transaction(func(tx *gorm.DB) error {
var err error
ret, err = do(SQLContext{tx}, t1, t2, t3)
return err
})
return ret, err
}

type SQLContext struct {
*gorm.DB
}

func (db *DB) DefCtx() SQLContext {
return SQLContext{db.db}
}

+ 48
- 0
coordinator/internal/db/hub.go View File

@@ -0,0 +1,48 @@
package db

import (
"time"

cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

type HubDB struct {
*DB
}

func (db *DB) Hub() *HubDB {
return &HubDB{DB: db}
}

func (*HubDB) GetAllHubs(ctx SQLContext) ([]cortypes.Hub, error) {
var ret []cortypes.Hub

err := ctx.Table("Hub").Find(&ret).Error
return ret, err
}

func (*HubDB) GetByID(ctx SQLContext, hubID cortypes.HubID) (cortypes.Hub, error) {
var ret cortypes.Hub
err := ctx.Table("Hub").Where("HubID = ?", hubID).Find(&ret).Error

return ret, err
}

func (*HubDB) BatchGetByID(ctx SQLContext, hubIDs []cortypes.HubID) ([]cortypes.Hub, error) {
var ret []cortypes.Hub
err := ctx.Table("Hub").Where("HubID IN (?)", hubIDs).Find(&ret).Error

return ret, err
}

// UpdateState 更新状态,并且设置上次上报时间为现在
func (*HubDB) UpdateState(ctx SQLContext, hubID cortypes.HubID, state string) error {
err := ctx.
Model(&cortypes.Hub{}).
Where("HubID = ?", hubID).
Updates(map[string]interface{}{
"State": state,
"LastReportTime": time.Now(),
}).Error
return err
}

+ 36
- 0
coordinator/internal/db/hub_connectivity.go View File

@@ -0,0 +1,36 @@
package db

import (
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
"gorm.io/gorm/clause"
)

type HubConnectivityDB struct {
*DB
}

func (db *DB) HubConnectivity() *HubConnectivityDB {
return &HubConnectivityDB{DB: db}
}

func (db *HubConnectivityDB) BatchGetByFromHub(ctx SQLContext, fromHubIDs []cortypes.HubID) ([]cortypes.HubConnectivity, error) {
if len(fromHubIDs) == 0 {
return nil, nil
}

var ret []cortypes.HubConnectivity

err := ctx.Table("HubConnectivity").Where("FromHubID IN (?)", fromHubIDs).Find(&ret).Error
return ret, err
}

func (db *HubConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []cortypes.HubConnectivity) error {
if len(cons) == 0 {
return nil
}

// 使用 GORM 的批量插入或更新
return ctx.Table("HubConnectivity").Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&cons).Error
}

+ 36
- 0
coordinator/internal/db/location.go View File

@@ -0,0 +1,36 @@
package db

import (
"fmt"

cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

type LocationDB struct {
*DB
}

func (db *DB) Location() *LocationDB {
return &LocationDB{DB: db}
}

func (*LocationDB) GetByID(ctx SQLContext, id int64) (cortypes.Location, error) {
var ret cortypes.Location
err := ctx.First(&ret, id).Error
return ret, err
}

func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (cortypes.Location, error) {
var locID int64
err := ctx.Table("Hub").Select("LocationID").Where("ExternalIP = ?", ip).Scan(&locID).Error
if err != nil {
return cortypes.Location{}, fmt.Errorf("finding hub by external ip: %w", err)
}

loc, err := db.GetByID(ctx, locID)
if err != nil {
return cortypes.Location{}, fmt.Errorf("getting location by id: %w", err)
}

return loc, nil
}

+ 43
- 0
coordinator/internal/db/storage.go View File

@@ -0,0 +1,43 @@
package db

import (
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
)

type StorageDB struct {
*DB
}

func (db *DB) Storage() *StorageDB {
return &StorageDB{DB: db}
}

func (db *StorageDB) GetByID(ctx SQLContext, stgID cortypes.StorageID) (cortypes.Storage, error) {
var stg cortypes.Storage
err := ctx.Table("Storage").First(&stg, stgID).Error
return stg, err
}

func (StorageDB) GetAllIDs(ctx SQLContext) ([]cortypes.StorageID, error) {
var stgs []cortypes.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&stgs).Error
return stgs, err
}

func (db *StorageDB) BatchGetByID(ctx SQLContext, stgIDs []cortypes.StorageID) ([]cortypes.Storage, error) {
var stgs []cortypes.Storage
err := ctx.Table("Storage").Find(&stgs, "StorageID IN (?)", stgIDs).Error
return stgs, err
}

func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cortypes.StorageID, error) {
var ret []cortypes.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error
return ret, err
}

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cortypes.HubID) ([]cortypes.Storage, error) {
var stgs []cortypes.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error
return stgs, err
}

+ 44
- 0
coordinator/internal/db/union_serializer.go View File

@@ -0,0 +1,44 @@
package db

import (
"context"
"fmt"
"reflect"

"gitlink.org.cn/cloudream/common/utils/serder"
"gorm.io/gorm/schema"
)

type UnionSerializer struct {
}

func (UnionSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error {
fieldValue := reflect.New(field.FieldType)
if dbValue != nil {
var data []byte
switch v := dbValue.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return fmt.Errorf("failed to unmarshal JSONB value: %#v", dbValue)
}

err := serder.JSONToObjectExRaw(data, fieldValue.Interface())
if err != nil {
return err
}
}

field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
return nil
}

func (UnionSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) {
return serder.ObjectToJSONEx(fieldValue)
}

func init() {
schema.RegisterSerializer("union", UnionSerializer{})
}

+ 44
- 0
coordinator/internal/db/user.go View File

@@ -0,0 +1,44 @@
package db

import (
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
"gorm.io/gorm"
)

type UserDB struct {
*DB
}

func (db *DB) User() *UserDB {
return &UserDB{DB: db}
}

func (db *UserDB) GetByID(ctx SQLContext, userID cortypes.UserID) (cortypes.User, error) {
var ret cortypes.User
err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error
return ret, err
}

func (db *UserDB) GetByName(ctx SQLContext, name string) (cortypes.User, error) {
var ret cortypes.User
err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error
return ret, err
}

func (db *UserDB) Create(ctx SQLContext, name string) (cortypes.User, error) {
_, err := db.GetByName(ctx, name)
if err == nil {
return cortypes.User{}, gorm.ErrDuplicatedKey
}
if err != gorm.ErrRecordNotFound {
return cortypes.User{}, err
}

user := cortypes.User{Name: name}
err = ctx.Table("User").Create(&user).Error
return user, err
}

func (*UserDB) Delete(ctx SQLContext, userID cortypes.UserID) error {
return ctx.Table("User").Delete(&cortypes.User{UserID: userID}).Error
}

+ 0
- 1
coordinator/internal/mq/agent.go View File

@@ -1 +0,0 @@
package mq

+ 0
- 143
coordinator/internal/mq/bucket.go View File

@@ -1,143 +0,0 @@
package mq

import (
"errors"
"fmt"
"time"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
"gorm.io/gorm"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) GetBucket(userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) {
// TODO
panic("not implement yet")
}

func (svc *Service) GetBucketByName(msg *coormq.GetBucketByName) (*coormq.GetBucketByNameResp, *mq.CodeMessage) {
bucket, err := svc.db2.Bucket().GetUserBucketByName(svc.db2.DefCtx(), msg.UserID, msg.Name)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("Name", msg.Name).
Warnf("getting bucket by name: %s", err.Error())

if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, mq.Failed(errorcode.DataNotFound, "bucket not found")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespGetBucketByName(bucket))
}

func (svc *Service) GetUserBuckets(msg *coormq.GetUserBuckets) (*coormq.GetUserBucketsResp, *mq.CodeMessage) {
buckets, err := svc.db2.Bucket().GetUserBuckets(svc.db2.DefCtx(), msg.UserID)

if err != nil {
logger.WithField("UserID", msg.UserID).
Warnf("get user buckets failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.NewGetUserBucketsResp(buckets))
}

func (svc *Service) GetBucketPackages(msg *coormq.GetBucketPackages) (*coormq.GetBucketPackagesResp, *mq.CodeMessage) {
packages, err := svc.db2.Package().GetUserBucketPackages(svc.db2.DefCtx(), msg.UserID, msg.BucketID)

if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warnf("get bucket packages failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get bucket packages failed")
}

return mq.ReplyOK(coormq.NewGetBucketPackagesResp(packages))
}

func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucketResp, *mq.CodeMessage) {
var bucket cdssdk.Bucket
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
_, err := svc.db2.User().GetByID(tx, msg.UserID)
if err != nil {
return fmt.Errorf("getting user by id: %w", err)
}

bucket, err = svc.db2.Bucket().Create(tx, msg.UserID, msg.BucketName, time.Now())
if err != nil {
return fmt.Errorf("creating bucket: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketName", msg.BucketName).
Warn(err.Error())

if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, mq.Failed(errorcode.DataExists, "bucket name already exists")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

svc.evtPub.Publish(&stgmod.BodyNewBucket{
Info: bucket,
})

return mq.ReplyOK(coormq.NewCreateBucketResp(bucket))
}

func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucketResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID)
if !isAvai {
return fmt.Errorf("bucket is not avaiable to the user")
}

if err := svc.db2.UserBucket().DeleteByBucketID(tx, msg.BucketID); err != nil {
return fmt.Errorf("deleting user bucket: %w", err)
}

pkgs, err := svc.db2.Package().GetBucketPackages(tx, msg.BucketID)
if err != nil {
return fmt.Errorf("getting bucket packages: %w", err)
}

for _, pkg := range pkgs {
err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err)
}
}

err = svc.db2.Bucket().Delete(tx, msg.BucketID)
if err != nil {
return fmt.Errorf("deleting bucket: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketID", msg.BucketID).
Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "delete bucket failed")
}

svc.evtPub.Publish(&stgmod.BodyBucketDeleted{
BucketID: msg.BucketID,
})

return mq.ReplyOK(coormq.NewDeleteBucketResp())
}

+ 0
- 66
coordinator/internal/mq/cache.go View File

@@ -1,66 +0,0 @@
package mq

import (
"fmt"

"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) CachePackageMoved(msg *coormq.CachePackageMoved) (*coormq.CachePackageMovedResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
_, err := svc.db2.Package().GetByID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

_, err = svc.db2.Storage().GetByID(tx, msg.StorageID)
if err != nil {
return fmt.Errorf("getting storage by id: %w", err)
}

err = svc.db2.PinnedObject().CreateFromPackage(tx, msg.PackageID, msg.StorageID)
if err != nil {
return fmt.Errorf("creating pinned objects from package: %w", err)
}

return nil
})
if err != nil {
logger.WithField("PackageID", msg.PackageID).WithField("HubID", msg.StorageID).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "create package pinned objects failed")
}

return mq.ReplyOK(coormq.NewCachePackageMovedResp())
}

func (svc *Service) CacheRemovePackage(msg *coormq.CacheRemovePackage) (*coormq.CacheRemovePackageResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
_, err := svc.db2.Package().GetByID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

_, err = svc.db2.Storage().GetByID(tx, msg.StorageID)
if err != nil {
return fmt.Errorf("getting storage by id: %w", err)
}

err = svc.db2.PinnedObject().DeleteInPackageAtStorage(tx, msg.PackageID, msg.StorageID)
if err != nil {
return fmt.Errorf("delete pinned objects in package at storage: %w", err)
}

return nil
})
if err != nil {
logger.WithField("PackageID", msg.PackageID).WithField("HubID", msg.StorageID).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "remove pinned package failed")
}

return mq.ReplyOK(coormq.RespCacheRemovePackage())
}

+ 6
- 87
coordinator/internal/mq/hub.go View File

@@ -3,64 +3,18 @@ package mq
import ( import (
"fmt" "fmt"


stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"

"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
cortypes "gitlink.org.cn/cloudream/storage2/coordinator/types"
) )


func (svc *Service) GetHubConfig(msg *coormq.GetHubConfig) (*coormq.GetHubConfigResp, *mq.CodeMessage) {
log := logger.WithField("HubID", msg.HubID)

hub, err := svc.db2.Hub().GetByID(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub: %v", err))
}

detailsMap := make(map[cdssdk.StorageID]*stgmod.StorageDetail)

stgs, err := svc.db2.Storage().GetHubStorages(svc.db2.DefCtx(), msg.HubID)
if err != nil {
log.Warnf("getting hub storages: %v", err)
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("getting hub storages: %v", err))
}

for _, stg := range stgs {
detailsMap[stg.StorageID] = &stgmod.StorageDetail{
Storage: stg,
MasterHub: &hub,
}
}

var details []stgmod.StorageDetail
for _, detail := range detailsMap {
details = append(details, *detail)
}

return mq.ReplyOK(coormq.RespGetHubConfig(hub, details))
}

func (svc *Service) GetUserHubs(msg *coormq.GetUserHubs) (*coormq.GetUserHubsResp, *mq.CodeMessage) {
hubs, err := svc.db2.Hub().GetUserHubs(svc.db2.DefCtx(), msg.UserID)
if err != nil {
logger.WithField("UserID", msg.UserID).
Warnf("query user hubs failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "query user hubs failed")
}

return mq.ReplyOK(coormq.NewGetUserHubsResp(hubs))
}

func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) { func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeMessage) {
var hubs []*cdssdk.Hub
var hubs []*cortypes.Hub


if msg.HubIDs == nil { if msg.HubIDs == nil {
get, err := svc.db2.Hub().GetAllHubs(svc.db2.DefCtx())
get, err := svc.db.Hub().GetAllHubs(svc.db.DefCtx())
if err != nil { if err != nil {
logger.Warnf("getting all hubs: %s", err.Error()) logger.Warnf("getting all hubs: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get all hub failed") return nil, mq.Failed(errorcode.OperationFailed, "get all hub failed")
@@ -72,13 +26,13 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM


} else { } else {
// 可以不用事务 // 可以不用事务
get, err := svc.db2.Hub().BatchGetByID(svc.db2.DefCtx(), msg.HubIDs)
get, err := svc.db.Hub().BatchGetByID(svc.db.DefCtx(), msg.HubIDs)
if err != nil { if err != nil {
logger.Warnf("batch get hubs by id: %s", err.Error()) logger.Warnf("batch get hubs by id: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("batch get hubs by id: %v", err)) return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("batch get hubs by id: %v", err))
} }


getMp := make(map[cdssdk.HubID]cdssdk.Hub)
getMp := make(map[cortypes.HubID]cortypes.Hub)
for _, hub := range get { for _, hub := range get {
getMp[hub.HubID] = hub getMp[hub.HubID] = hub
} }
@@ -97,7 +51,7 @@ func (svc *Service) GetHubs(msg *coormq.GetHubs) (*coormq.GetHubsResp, *mq.CodeM
} }


func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coormq.GetHubConnectivitiesResp, *mq.CodeMessage) { func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coormq.GetHubConnectivitiesResp, *mq.CodeMessage) {
cons, err := svc.db2.HubConnectivity().BatchGetByFromHub(svc.db2.DefCtx(), msg.HubIDs)
cons, err := svc.db.HubConnectivity().BatchGetByFromHub(svc.db.DefCtx(), msg.HubIDs)
if err != nil { if err != nil {
logger.Warnf("batch get hub connectivities by from hub: %s", err.Error()) logger.Warnf("batch get hub connectivities by from hub: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch get hub connectivities by from hub failed") return nil, mq.Failed(errorcode.OperationFailed, "batch get hub connectivities by from hub failed")
@@ -105,38 +59,3 @@ func (svc *Service) GetHubConnectivities(msg *coormq.GetHubConnectivities) (*coo


return mq.ReplyOK(coormq.RespGetHubConnectivities(cons)) return mq.ReplyOK(coormq.RespGetHubConnectivities(cons))
} }

func (svc *Service) UpdateHubConnectivities(msg *coormq.UpdateHubConnectivities) (*coormq.UpdateHubConnectivitiesResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
// 只有发起节点和目的节点都存在,才能插入这条记录到数据库
allHubs, err := svc.db2.Hub().GetAllHubs(tx)
if err != nil {
return fmt.Errorf("getting all hubs: %w", err)
}

allHubID := make(map[cdssdk.HubID]bool)
for _, hub := range allHubs {
allHubID[hub.HubID] = true
}

var avaiCons []cdssdk.HubConnectivity
for _, con := range msg.Connectivities {
if allHubID[con.FromHubID] && allHubID[con.ToHubID] {
avaiCons = append(avaiCons, con)
}
}

err = svc.db2.HubConnectivity().BatchUpdateOrCreate(tx, avaiCons)
if err != nil {
return fmt.Errorf("batch update or create hub connectivities: %s", err)
}

return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespUpdateHubConnectivities())
}

+ 0
- 891
coordinator/internal/mq/object.go View File

@@ -1,891 +0,0 @@
package mq

import (
"errors"
"fmt"
"time"

"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm"

"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/sort2"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) GetObjects(msg *coormq.GetObjects) (*coormq.GetObjectsResp, *mq.CodeMessage) {
var ret []*cdssdk.Object
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
// TODO 应该检查用户是否有每一个Object所在Package的权限
objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs)
if err != nil {
return err
}

objMp := make(map[cdssdk.ObjectID]cdssdk.Object)
for _, obj := range objs {
objMp[obj.ObjectID] = obj
}

for _, objID := range msg.ObjectIDs {
o, ok := objMp[objID]
if ok {
ret = append(ret, &o)
} else {
ret = append(ret, nil)
}
}

return err
})
if err != nil {
logger.WithField("UserID", msg.UserID).
Warn(err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get objects failed")
}

return mq.ReplyOK(coormq.RespGetObjects(ret))
}

func (svc *Service) ListObjectsByPath(msg *coormq.ListObjectsByPath) (*coormq.ListObjectsByPathResp, *mq.CodeMessage) {
var coms []string
var objs []cdssdk.Object
var conToken string

maxKeys := 1000
if msg.MaxKeys > 0 {
maxKeys = msg.MaxKeys
}

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

_, err = svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

if !msg.IsPrefix {
obj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err != nil {
return fmt.Errorf("getting object by path: %w", err)
}
objs = append(objs, obj)

return nil
}

if !msg.NoRecursive {
objs, err = svc.db2.Object().GetWithPathPrefixPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
if err != nil {
return fmt.Errorf("getting objects with prefix: %w", err)
}

if len(objs) > 0 {
conToken = objs[len(objs)-1].Path
}

return nil
}

objs, coms, conToken, err = svc.db2.Object().GetByPrefixGroupedPaged(tx, msg.PackageID, msg.Path, msg.ContinuationToken, maxKeys)
return err
})
if err != nil {
logger.WithField("PathPrefix", msg.Path).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get objects with prefix failed")
}

return mq.ReplyOK(coormq.RespListObjectsByPath(cdsapi.ObjectListByPathResp{
CommonPrefixes: coms,
Objects: objs,
IsTruncated: len(coms)+len(objs) >= maxKeys,
NextContinuationToken: conToken,
}))
}

func (svc *Service) GetPackageObjects(msg *coormq.GetPackageObjects) (*coormq.GetPackageObjectsResp, *mq.CodeMessage) {
var objs []cdssdk.Object
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
_, err := svc.db2.Package().GetUserPackage(tx, msg.UserID, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

objs, err = svc.db2.Object().GetPackageObjects(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).WithField("PackageID", msg.PackageID).
Warn(err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package objects failed")
}

return mq.ReplyOK(coormq.RespGetPackageObjects(objs))
}

func (svc *Service) GetPackageObjectDetails(msg *coormq.GetPackageObjectDetails) (*coormq.GetPackageObjectDetailsResp, *mq.CodeMessage) {
var details []stgmod.ObjectDetail
// 必须放在事务里进行,因为GetPackageBlockDetails是由多次数据库操作组成,必须保证数据的一致性
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error
_, err = svc.db2.Package().GetByID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

details, err = svc.db2.Object().GetPackageObjectDetails(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package block details: %w", err)
}

return nil
})

if err != nil {
logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get package object block details failed")
}

return mq.ReplyOK(coormq.RespPackageObjectDetails(details))
}

func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetObjectDetailsResp, *mq.CodeMessage) {
detailsMp := make(map[cdssdk.ObjectID]*stgmod.ObjectDetail)

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

msg.ObjectIDs = sort2.SortAsc(msg.ObjectIDs)

// 根据ID依次查询Object,ObjectBlock,PinnedObject,并根据升序的特点进行合并
objs, err := svc.db2.Object().BatchGet(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get objects: %w", err)
}
for _, obj := range objs {
detailsMp[obj.ObjectID] = &stgmod.ObjectDetail{
Object: obj,
}
}

// 查询合并
blocks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get object blocks: %w", err)
}
for _, block := range blocks {
d := detailsMp[block.ObjectID]
d.Blocks = append(d.Blocks, block)
}

// 查询合并
pinneds, err := svc.db2.PinnedObject().BatchGetByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch get pinned objects: %w", err)
}
for _, pinned := range pinneds {
d := detailsMp[pinned.ObjectID]
d.PinnedAt = append(d.PinnedAt, pinned.StorageID)
}

return nil
})

if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get object details failed")
}

details := make([]*stgmod.ObjectDetail, len(msg.ObjectIDs))
for i, objID := range msg.ObjectIDs {
details[i] = detailsMp[objID]
}

return mq.ReplyOK(coormq.RespGetObjectDetails(details))
}

func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(ctx db2.SQLContext) error {
db := svc.db2
objs := msg.Updatings

nowTime := time.Now()
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
for _, obj := range objs {
objIDs = append(objIDs, obj.ObjectID)
}

avaiIDs, err := db.Object().BatchTestObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch test object id: %w", err)
}

// 过滤掉已经不存在的对象。
// 注意,objIDs没有被过滤,因为后续逻辑不过滤也不会出错
objs = lo.Filter(objs, func(obj coormq.UpdatingObjectRedundancy, _ int) bool {
return avaiIDs[obj.ObjectID]
})

dummyObjs := make([]cdssdk.Object, 0, len(objs))
for _, obj := range objs {
dummyObjs = append(dummyObjs, cdssdk.Object{
ObjectID: obj.ObjectID,
FileHash: obj.FileHash,
Size: obj.Size,
Redundancy: obj.Redundancy,
CreateTime: nowTime, // 实际不会更新,只因为不能是0值
UpdateTime: nowTime,
})
}

err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"FileHash", "Size", "Redundancy", "UpdateTime"})
if err != nil {
return fmt.Errorf("batch update object redundancy: %w", err)
}

// 删除原本所有的编码块记录,重新添加
err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete object blocks: %w", err)
}

// 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete pinned object: %w", err)
}

blocks := make([]stgmod.ObjectBlock, 0, len(objs))
for _, obj := range objs {
blocks = append(blocks, obj.Blocks...)
}
err = db.ObjectBlock().BatchCreate(ctx, blocks)
if err != nil {
return fmt.Errorf("batch create object blocks: %w", err)
}

caches := make([]model.Cache, 0, len(objs))
for _, obj := range objs {
for _, blk := range obj.Blocks {
caches = append(caches, model.Cache{
FileHash: blk.FileHash,
StorageID: blk.StorageID,
CreateTime: nowTime,
Priority: 0,
})
}
}
err = db.Cache().BatchCreate(ctx, caches)
if err != nil {
return fmt.Errorf("batch create object caches: %w", err)
}

pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
for _, obj := range objs {
for _, p := range obj.PinnedAt {
pinneds = append(pinneds, cdssdk.PinnedObject{
ObjectID: obj.ObjectID,
StorageID: p,
CreateTime: nowTime,
})
}
}
err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
if err != nil {
return fmt.Errorf("batch create pinned objects: %w", err)
}

return nil
})
if err != nil {
logger.Warnf("batch updating redundancy: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch update redundancy failed")
}

return mq.ReplyOK(coormq.RespUpdateObjectRedundancy())
}

func (svc *Service) UpdateObjectInfos(msg *coormq.UpdateObjectInfos) (*coormq.UpdateObjectInfosResp, *mq.CodeMessage) {
var sucs []cdssdk.ObjectID
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
msg.Updatings = sort2.Sort(msg.Updatings, func(o1, o2 cdsapi.UpdatingObject) int {
return sort2.Cmp(o1.ObjectID, o2.ObjectID)
})

objIDs := make([]cdssdk.ObjectID, len(msg.Updatings))
for i, obj := range msg.Updatings {
objIDs[i] = obj.ObjectID
}

oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
if err != nil {
return fmt.Errorf("batch getting objects: %w", err)
}
oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
for i, obj := range oldObjs {
oldObjIDs[i] = obj.ObjectID
}

avaiUpdatings, notExistsObjs := pickByObjectIDs(msg.Updatings, oldObjIDs, func(obj cdsapi.UpdatingObject) cdssdk.ObjectID { return obj.ObjectID })
if len(notExistsObjs) > 0 {
// TODO 部分对象已经不存在
}

newObjs := make([]cdssdk.Object, len(avaiUpdatings))
for i := range newObjs {
newObjs[i] = oldObjs[i]
avaiUpdatings[i].ApplyTo(&newObjs[i])
}

err = svc.db2.Object().BatchUpdate(tx, newObjs)
if err != nil {
return fmt.Errorf("batch create or update: %w", err)
}

sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
return nil
})

if err != nil {
logger.Warnf("batch updating objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch update objects failed")
}

return mq.ReplyOK(coormq.RespUpdateObjectInfos(sucs))
}

// 根据objIDs从objs中挑选Object。
// len(objs) >= len(objIDs)
func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cdssdk.ObjectID) (picked []T, notFound []T) {
objIdx := 0
idIdx := 0

for idIdx < len(objIDs) && objIdx < len(objs) {
if getID(objs[objIdx]) < objIDs[idIdx] {
notFound = append(notFound, objs[objIdx])
objIdx++
continue
}

picked = append(picked, objs[objIdx])
objIdx++
idIdx++
}

return
}

func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) {
var sucs []cdssdk.ObjectID
var evt []*stgmod.BodyObjectInfoUpdated

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int {
return sort2.Cmp(o1.ObjectID, o2.ObjectID)
})

objIDs := make([]cdssdk.ObjectID, len(msg.Movings))
for i, obj := range msg.Movings {
objIDs[i] = obj.ObjectID
}

oldObjs, err := svc.db2.Object().BatchGet(tx, objIDs)
if err != nil {
return fmt.Errorf("batch getting objects: %w", err)
}
oldObjIDs := make([]cdssdk.ObjectID, len(oldObjs))
for i, obj := range oldObjs {
oldObjIDs[i] = obj.ObjectID
}

// 找出仍在数据库的Object
avaiMovings, notExistsObjs := pickByObjectIDs(msg.Movings, oldObjIDs, func(obj cdsapi.MovingObject) cdssdk.ObjectID { return obj.ObjectID })
if len(notExistsObjs) > 0 {
// TODO 部分对象已经不存在
}

// 筛选出PackageID变化、Path变化的对象,这两种对象要检测改变后是否有冲突
var pkgIDChangedObjs []cdssdk.Object
var pathChangedObjs []cdssdk.Object
for i := range avaiMovings {
if avaiMovings[i].PackageID != oldObjs[i].PackageID {
newObj := oldObjs[i]
avaiMovings[i].ApplyTo(&newObj)
pkgIDChangedObjs = append(pkgIDChangedObjs, newObj)
} else if avaiMovings[i].Path != oldObjs[i].Path {
newObj := oldObjs[i]
avaiMovings[i].ApplyTo(&newObj)
pathChangedObjs = append(pathChangedObjs, newObj)
}
}

var newObjs []cdssdk.Object
// 对于PackageID发生变化的对象,需要检查目标Package内是否存在同Path的对象
checkedObjs, err := svc.checkPackageChangedObjects(tx, msg.UserID, pkgIDChangedObjs)
if err != nil {
return err
}
newObjs = append(newObjs, checkedObjs...)

// 对于只有Path发生变化的对象,则检查同Package内有没有同Path的对象
checkedObjs, err = svc.checkPathChangedObjects(tx, msg.UserID, pathChangedObjs)
if err != nil {
return err
}
newObjs = append(newObjs, checkedObjs...)

err = svc.db2.Object().BatchUpdate(tx, newObjs)
if err != nil {
return fmt.Errorf("batch create or update: %w", err)
}

sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID })
evt = lo.Map(newObjs, func(obj cdssdk.Object, _ int) *stgmod.BodyObjectInfoUpdated {
return &stgmod.BodyObjectInfoUpdated{
Object: obj,
}
})
return nil
})
if err != nil {
logger.Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "move objects failed")
}

for _, e := range evt {
svc.evtPub.Publish(e)
}

return mq.ReplyOK(coormq.RespMoveObjects(sucs))
}

func (svc *Service) checkPackageChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
if len(objs) == 0 {
return nil, nil
}

type PackageObjects struct {
PackageID cdssdk.PackageID
ObjectByPath map[string]*cdssdk.Object
}

packages := make(map[cdssdk.PackageID]*PackageObjects)
for _, obj := range objs {
pkg, ok := packages[obj.PackageID]
if !ok {
pkg = &PackageObjects{
PackageID: obj.PackageID,
ObjectByPath: make(map[string]*cdssdk.Object),
}
packages[obj.PackageID] = pkg
}

if pkg.ObjectByPath[obj.Path] == nil {
o := obj
pkg.ObjectByPath[obj.Path] = &o
} else {
// TODO 有两个对象移动到同一个路径,有冲突
}
}

var willUpdateObjs []cdssdk.Object
for _, pkg := range packages {
_, err := svc.db2.Package().GetUserPackage(tx, userID, pkg.PackageID)
if errors.Is(err, gorm.ErrRecordNotFound) {
continue
}
if err != nil {
return nil, fmt.Errorf("getting user package by id: %w", err)
}

existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.ObjectByPath))
if err != nil {
return nil, fmt.Errorf("batch getting objects by package path: %w", err)
}

// 标记冲突的对象
for _, obj := range existsObjs {
pkg.ObjectByPath[obj.Path] = nil
// TODO 目标Package内有冲突的对象
}

for _, obj := range pkg.ObjectByPath {
if obj == nil {
continue
}
willUpdateObjs = append(willUpdateObjs, *obj)
}
}

return willUpdateObjs, nil
}

func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.UserID, objs []cdssdk.Object) ([]cdssdk.Object, error) {
if len(objs) == 0 {
return nil, nil
}

objByPath := make(map[string]*cdssdk.Object)
for _, obj := range objs {
if objByPath[obj.Path] == nil {
o := obj
objByPath[obj.Path] = &o
} else {
// TODO 有两个对象移动到同一个路径,有冲突
}

}

_, err := svc.db2.Package().GetUserPackage(tx, userID, objs[0].PackageID)
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("getting user package by id: %w", err)
}

existsObjs, err := svc.db2.Object().BatchGetByPackagePath(tx, objs[0].PackageID, lo.Map(objs, func(obj cdssdk.Object, idx int) string { return obj.Path }))
if err != nil {
return nil, fmt.Errorf("batch getting objects by package path: %w", err)
}

// 不支持两个对象交换位置的情况,因为数据库不支持
for _, obj := range existsObjs {
objByPath[obj.Path] = nil
}

var willMoveObjs []cdssdk.Object
for _, obj := range objByPath {
if obj == nil {
continue
}
willMoveObjs = append(willMoveObjs, *obj)
}

return willMoveObjs, nil
}

func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) {
var sucs []cdssdk.ObjectID
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
avaiIDs, err := svc.db2.Object().BatchTestObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch testing object id: %w", err)
}
sucs = lo.Keys(avaiIDs)

err = svc.db2.Object().BatchDelete(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch deleting objects: %w", err)
}

err = svc.db2.ObjectBlock().BatchDeleteByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch deleting object blocks: %w", err)
}

err = svc.db2.PinnedObject().BatchDeleteByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch deleting pinned objects: %w", err)
}

err = svc.db2.ObjectAccessStat().BatchDeleteByObjectID(tx, msg.ObjectIDs)
if err != nil {
return fmt.Errorf("batch deleting object access stats: %w", err)
}

return nil
})
if err != nil {
logger.Warnf("batch deleting objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
}

for _, objID := range sucs {
svc.evtPub.Publish(&stgmod.BodyObjectDeleted{
ObjectID: objID,
})
}

return mq.ReplyOK(coormq.RespDeleteObjects(sucs))
}

func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjectsResp, *mq.CodeMessage) {
type CloningObject struct {
Cloning cdsapi.CloningObject
OrgIndex int
}
type PackageClonings struct {
PackageID cdssdk.PackageID
Clonings map[string]CloningObject
}

var evt []*stgmod.BodyNewOrUpdateObject

// TODO 要检查用户是否有Object、Package的权限
clonings := make(map[cdssdk.PackageID]*PackageClonings)
for i, cloning := range msg.Clonings {
pkg, ok := clonings[cloning.NewPackageID]
if !ok {
pkg = &PackageClonings{
PackageID: cloning.NewPackageID,
Clonings: make(map[string]CloningObject),
}
clonings[cloning.NewPackageID] = pkg
}
pkg.Clonings[cloning.NewPath] = CloningObject{
Cloning: cloning,
OrgIndex: i,
}
}

ret := make([]*cdssdk.Object, len(msg.Clonings))
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
// 剔除掉新路径已经存在的对象
for _, pkg := range clonings {
exists, err := svc.db2.Object().BatchGetByPackagePath(tx, pkg.PackageID, lo.Keys(pkg.Clonings))
if err != nil {
return fmt.Errorf("batch getting objects by package path: %w", err)
}

for _, obj := range exists {
delete(pkg.Clonings, obj.Path)
}
}

// 删除目的Package不存在的对象
newPkg, err := svc.db2.Package().BatchTestPackageID(tx, lo.Keys(clonings))
if err != nil {
return fmt.Errorf("batch testing package id: %w", err)
}
for _, pkg := range clonings {
if !newPkg[pkg.PackageID] {
delete(clonings, pkg.PackageID)
}
}

var avaiClonings []CloningObject
var avaiObjIDs []cdssdk.ObjectID
for _, pkg := range clonings {
for _, cloning := range pkg.Clonings {
avaiClonings = append(avaiClonings, cloning)
avaiObjIDs = append(avaiObjIDs, cloning.Cloning.ObjectID)
}
}

avaiDetails, err := svc.db2.Object().BatchGetDetails(tx, avaiObjIDs)
if err != nil {
return fmt.Errorf("batch getting object details: %w", err)
}

avaiDetailsMap := make(map[cdssdk.ObjectID]stgmod.ObjectDetail)
for _, detail := range avaiDetails {
avaiDetailsMap[detail.Object.ObjectID] = detail
}

oldAvaiClonings := avaiClonings
avaiClonings = nil

var newObjs []cdssdk.Object
for _, cloning := range oldAvaiClonings {
// 进一步剔除原始对象不存在的情况
detail, ok := avaiDetailsMap[cloning.Cloning.ObjectID]
if !ok {
continue
}

avaiClonings = append(avaiClonings, cloning)

newObj := detail.Object
newObj.ObjectID = 0
newObj.Path = cloning.Cloning.NewPath
newObj.PackageID = cloning.Cloning.NewPackageID
newObjs = append(newObjs, newObj)
}

// 先创建出新对象
err = svc.db2.Object().BatchCreate(tx, &newObjs)
if err != nil {
return fmt.Errorf("batch creating objects: %w", err)
}

// 创建了新对象就能拿到新对象ID,再创建新对象块
var newBlks []stgmod.ObjectBlock
for i, cloning := range avaiClonings {
oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
for _, blk := range oldBlks {
newBlk := blk
newBlk.ObjectID = newObjs[i].ObjectID
newBlks = append(newBlks, newBlk)
}
}

err = svc.db2.ObjectBlock().BatchCreate(tx, newBlks)
if err != nil {
return fmt.Errorf("batch creating object blocks: %w", err)
}

for i, cloning := range avaiClonings {
ret[cloning.OrgIndex] = &newObjs[i]
}

for i, cloning := range avaiClonings {
var evtBlks []stgmod.BlockDistributionObjectInfo
blkType := getBlockTypeFromRed(newObjs[i].Redundancy)

oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks
for _, blk := range oldBlks {
evtBlks = append(evtBlks, stgmod.BlockDistributionObjectInfo{
BlockType: blkType,
Index: blk.Index,
StorageID: blk.StorageID,
})
}

evt = append(evt, &stgmod.BodyNewOrUpdateObject{
Info: newObjs[i],
BlockDistribution: evtBlks,
})
}
return nil
})

if err != nil {
logger.Warnf("cloning objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

for _, e := range evt {
svc.evtPub.Publish(e)
}

return mq.ReplyOK(coormq.RespCloneObjects(ret))
}

func (svc *Service) NewMultipartUploadObject(msg *coormq.NewMultipartUploadObject) (*coormq.NewMultipartUploadObjectResp, *mq.CodeMessage) {
var obj cdssdk.Object
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
oldObj, err := svc.db2.Object().GetByPath(tx, msg.PackageID, msg.Path)
if err == nil {
obj = oldObj
err := svc.db2.ObjectBlock().DeleteByObjectID(tx, obj.ObjectID)
if err != nil {
return fmt.Errorf("delete object blocks: %w", err)
}

obj.FileHash = cdssdk.EmptyHash
obj.Size = 0
obj.Redundancy = cdssdk.NewMultipartUploadRedundancy()
obj.UpdateTime = time.Now()

err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj})
if err != nil {
return fmt.Errorf("update object: %w", err)
}

return nil
}

obj = cdssdk.Object{
PackageID: msg.PackageID,
Path: msg.Path,
FileHash: cdssdk.EmptyHash,
Size: 0,
Redundancy: cdssdk.NewMultipartUploadRedundancy(),
CreateTime: time.Now(),
UpdateTime: time.Now(),
}
objID, err := svc.db2.Object().Create(tx, obj)
if err != nil {
return fmt.Errorf("create object: %w", err)
}

obj.ObjectID = objID
return nil
})
if err != nil {
logger.Warnf("new multipart upload object: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("new multipart upload object: %v", err))
}

return mq.ReplyOK(coormq.RespNewMultipartUploadObject(obj))
}

func (svc *Service) AddMultipartUploadPart(msg *coormq.AddMultipartUploadPart) (*coormq.AddMultipartUploadPartResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
obj, err := svc.db2.Object().GetByID(tx, msg.ObjectID)
if err != nil {
return fmt.Errorf("getting object by id: %w", err)
}

_, ok := obj.Redundancy.(*cdssdk.MultipartUploadRedundancy)
if !ok {
return fmt.Errorf("object is not a multipart upload object")
}

blks, err := svc.db2.ObjectBlock().BatchGetByObjectID(tx, []cdssdk.ObjectID{obj.ObjectID})
if err != nil {
return fmt.Errorf("batch getting object blocks: %w", err)
}

blks = lo.Reject(blks, func(blk stgmod.ObjectBlock, idx int) bool { return blk.Index == msg.Block.Index })
blks = append(blks, msg.Block)

blks = sort2.Sort(blks, func(a, b stgmod.ObjectBlock) int { return a.Index - b.Index })

totalSize := int64(0)
var hashes [][]byte
for _, blk := range blks {
totalSize += blk.Size
hashes = append(hashes, blk.FileHash.GetHashBytes())
}

newObjHash := cdssdk.CalculateCompositeHash(hashes)
obj.Size = totalSize
obj.FileHash = newObjHash
obj.UpdateTime = time.Now()

err = svc.db2.ObjectBlock().DeleteByObjectIDIndex(tx, msg.ObjectID, msg.Block.Index)
if err != nil {
return fmt.Errorf("delete object block: %w", err)
}

err = svc.db2.ObjectBlock().Create(tx, msg.ObjectID, msg.Block.Index, msg.Block.StorageID, msg.Block.FileHash, msg.Block.Size)
if err != nil {
return fmt.Errorf("create object block: %w", err)
}

err = svc.db2.Object().BatchUpdate(tx, []cdssdk.Object{obj})
if err != nil {
return fmt.Errorf("update object: %w", err)
}

return nil
})
if err != nil {
logger.Warnf("add multipart upload part: %s", err.Error())

code := errorcode.OperationFailed
if errors.Is(err, gorm.ErrRecordNotFound) {
code = errorcode.DataNotFound
}

return nil, mq.Failed(code, fmt.Sprintf("add multipart upload part: %v", err))
}

return mq.ReplyOK(coormq.RespAddMultipartUploadPart())
}

+ 0
- 337
coordinator/internal/mq/package.go View File

@@ -1,337 +0,0 @@
package mq

import (
"errors"
"fmt"
"sort"

stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
"gorm.io/gorm"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) {
pkg, err := svc.db2.Package().GetByID(svc.db2.DefCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package: %s", err.Error())

if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, mq.Failed(errorcode.DataNotFound, "package not found")
}

return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
}

return mq.ReplyOK(coormq.NewGetPackageResp(pkg))
}

func (svc *Service) GetPackageByName(msg *coormq.GetPackageByName) (*coormq.GetPackageByNameResp, *mq.CodeMessage) {
pkg, err := svc.db2.Package().GetUserPackageByName(svc.db2.DefCtx(), msg.UserID, msg.BucketName, msg.PackageName)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("BucketName", msg.BucketName).
WithField("PackageName", msg.PackageName).
Warnf("get package by name: %s", err.Error())

if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, mq.Failed(errorcode.DataNotFound, "package not found")
}

return nil, mq.Failed(errorcode.OperationFailed, "get package by name failed")
}

return mq.ReplyOK(coormq.NewGetPackageByNameResp(pkg))
}

func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) {
var pkg cdssdk.Package
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID)
if !isAvai {
return fmt.Errorf("bucket is not avaiable to the user")
}

pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name)
if err != nil {
return fmt.Errorf("creating package: %w", err)
}

return nil
})
if err != nil {
logger.WithField("BucketID", msg.BucketID).
WithField("Name", msg.Name).
Warn(err.Error())

if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, mq.Failed(errorcode.DataExists, "package already exists")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

svc.evtPub.Publish(&stgmod.BodyNewPackage{
Info: pkg,
})

return mq.ReplyOK(coormq.NewCreatePackageResp(pkg))
}

func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) {
var added []cdssdk.Object
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
_, err := svc.db2.Package().GetByID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package by id: %w", err)
}

ad, err := svc.db2.Object().BatchAdd(tx, msg.PackageID, msg.Adds)
if err != nil {
return fmt.Errorf("adding objects: %w", err)
}
added = ad

return nil
})
if err != nil {
logger.WithField("PackageID", msg.PackageID).Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "update package failed")
}

addedMp := make(map[string]cdssdk.Object)
for _, obj := range added {
addedMp[obj.Path] = obj
}

for _, add := range msg.Adds {
var blks []stgmod.BlockDistributionObjectInfo
for _, stgID := range add.StorageIDs {
blks = append(blks, stgmod.BlockDistributionObjectInfo{
BlockType: stgmod.BlockTypeRaw,
StorageID: stgID,
})
}

svc.evtPub.Publish(&stgmod.BodyNewOrUpdateObject{
Info: addedMp[add.Path],
BlockDistribution: blks,
})
}

return mq.ReplyOK(coormq.NewUpdatePackageResp(added))
}

func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
isAvai, _ := svc.db2.Package().IsAvailable(tx, msg.UserID, msg.PackageID)
if !isAvai {
return fmt.Errorf("package is not available to the user")
}

err := svc.db2.Package().DeleteComplete(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("deleting package: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "delete package failed")
}

svc.evtPub.Publish(&stgmod.BodyPackageDeleted{
PackageID: msg.PackageID,
})

return mq.ReplyOK(coormq.NewDeletePackageResp())
}

func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackageResp, *mq.CodeMessage) {
var pkg cdssdk.Package
var oldObjIDs []cdssdk.ObjectID
var newObjIDs []cdssdk.ObjectID
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

isAvai, _ := svc.db2.Bucket().IsAvailable(tx, msg.BucketID, msg.UserID)
if !isAvai {
return fmt.Errorf("bucket is not avaiable to the user")
}

pkg, err = svc.db2.Package().Create(tx, msg.BucketID, msg.Name)
if err != nil {
return fmt.Errorf("creating package: %w", err)
}

objs, err := svc.db2.Object().GetPackageObjects(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}

objBlks, err := svc.db2.ObjectBlock().GetInPackageID(tx, msg.PackageID)
if err != nil {
return fmt.Errorf("getting object blocks: %w", err)
}

clonedObjs := make([]cdssdk.Object, len(objs))
for i, obj := range objs {
clonedObjs[i] = obj
clonedObjs[i].ObjectID = 0
clonedObjs[i].PackageID = pkg.PackageID
}

err = svc.db2.Object().BatchCreate(tx, &clonedObjs)
if err != nil {
return fmt.Errorf("batch creating objects: %w", err)
}

oldToNew := make(map[cdssdk.ObjectID]cdssdk.ObjectID)
for i, obj := range clonedObjs {
oldToNew[objs[i].ObjectID] = obj.ObjectID

oldObjIDs = append(oldObjIDs, objs[i].ObjectID)
newObjIDs = append(newObjIDs, obj.ObjectID)
}

clonedBlks := make([]stgmod.ObjectBlock, len(objBlks))
for i, blk := range objBlks {
clonedBlks[i] = blk
clonedBlks[i].ObjectID = oldToNew[blk.ObjectID]
}

err = svc.db2.ObjectBlock().BatchCreate(tx, clonedBlks)
if err != nil {
return fmt.Errorf("batch creating object blocks: %w", err)
}

return nil
})
if err != nil {
if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, mq.Failed(errorcode.DataExists, "package already exists")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

svc.evtPub.Publish(&stgmod.BodyPackageCloned{
SourcePackageID: msg.PackageID,
NewPackage: pkg,
SourceObjectIDs: oldObjIDs,
NewObjectIDs: newObjIDs,
})

return mq.ReplyOK(coormq.RespClonePackage(pkg))
}

func (svc *Service) GetPackageCachedStorages(msg *coormq.GetPackageCachedStorages) (*coormq.GetPackageCachedStoragesResp, *mq.CodeMessage) {
isAva, err := svc.db2.Package().IsAvailable(svc.db2.DefCtx(), msg.UserID, msg.PackageID)
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("check package available failed, err: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "check package available failed")
}
if !isAva {
logger.WithField("UserID", msg.UserID).
WithField("PackageID", msg.PackageID).
Warnf("package is not available to the user")
return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user")
}

// 这个函数只是统计哪些节点缓存了Package中的数据,不需要多么精确,所以可以不用事务
objDetails, err := svc.db2.Object().GetPackageObjectDetails(svc.db2.DefCtx(), msg.PackageID)
if err != nil {
logger.WithField("PackageID", msg.PackageID).
Warnf("get package block details: %s", err.Error())

return nil, mq.Failed(errorcode.OperationFailed, "get package block details failed")
}

var packageSize int64
stgInfoMap := make(map[cdssdk.StorageID]*cdssdk.StoragePackageCachingInfo)
for _, obj := range objDetails {
// 只要存了文件的一个块,就认为此节点存了整个文件
for _, block := range obj.Blocks {
info, ok := stgInfoMap[block.StorageID]
if !ok {
info = &cdssdk.StoragePackageCachingInfo{
StorageID: block.StorageID,
}
stgInfoMap[block.StorageID] = info

}

info.FileSize += obj.Object.Size
info.ObjectCount++
}
}

var stgInfos []cdssdk.StoragePackageCachingInfo
for _, stgInfo := range stgInfoMap {
stgInfos = append(stgInfos, *stgInfo)
}

sort.Slice(stgInfos, func(i, j int) bool {
return stgInfos[i].StorageID < stgInfos[j].StorageID
})
return mq.ReplyOK(coormq.ReqGetPackageCachedStoragesResp(stgInfos, packageSize))
}

func (svc *Service) AddAccessStat(msg *coormq.AddAccessStat) {
pkgIDs := make([]cdssdk.PackageID, len(msg.Entries))
objIDs := make([]cdssdk.ObjectID, len(msg.Entries))
for i, e := range msg.Entries {
pkgIDs[i] = e.PackageID
objIDs[i] = e.ObjectID
}

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
avaiPkgIDs, err := svc.db2.Package().BatchTestPackageID(tx, pkgIDs)
if err != nil {
return fmt.Errorf("batch test package id: %w", err)
}

avaiObjIDs, err := svc.db2.Object().BatchTestObjectID(tx, objIDs)
if err != nil {
return fmt.Errorf("batch test object id: %w", err)
}

var willAdds []coormq.AddAccessStatEntry
for _, e := range msg.Entries {
if avaiPkgIDs[e.PackageID] && avaiObjIDs[e.ObjectID] {
willAdds = append(willAdds, e)
}
}

if len(willAdds) > 0 {
err := svc.db2.PackageAccessStat().BatchAddCounter(tx, willAdds)
if err != nil {
return fmt.Errorf("batch add package access stat counter: %w", err)
}

err = svc.db2.ObjectAccessStat().BatchAddCounter(tx, willAdds)
if err != nil {
return fmt.Errorf("batch add object access stat counter: %w", err)
}
}

return nil
})

if err != nil {
logger.Warn(err.Error())
}
}

+ 4
- 7
coordinator/internal/mq/service.go View File

@@ -1,18 +1,15 @@
package mq package mq


import ( import (
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
"gitlink.org.cn/cloudream/storage2/common/pkgs/sysevent"
"gitlink.org.cn/cloudream/storage2/coordinator/internal/db"
) )


type Service struct { type Service struct {
db2 *db2.DB
evtPub *sysevent.Publisher
db *db.DB
} }


func NewService(db2 *db2.DB, evtPub *sysevent.Publisher) *Service {
func NewService(db *db.DB) *Service {
return &Service{ return &Service{
db2: db2,
evtPub: evtPub,
db: db,
} }
} }

+ 1
- 123
coordinator/internal/mq/storage.go View File

@@ -1,23 +1,15 @@
package mq package mq


import ( import (
"errors"
"fmt"
"time"

"gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gorm.io/gorm"


"gitlink.org.cn/cloudream/common/pkgs/mq" "gitlink.org.cn/cloudream/common/pkgs/mq"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator" coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
) )


func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) { func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp, *mq.CodeMessage) {
stg, err := svc.db2.Storage().GetUserStorage(svc.db2.DefCtx(), msg.UserID, msg.StorageID)
stg, err := svc.db.Storage().GetByID(svc.db.DefCtx(), msg.StorageID)
if err != nil { if err != nil {
logger.Warnf("getting user storage: %s", err.Error()) logger.Warnf("getting user storage: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed") return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
@@ -25,117 +17,3 @@ func (svc *Service) GetStorage(msg *coormq.GetStorage) (*coormq.GetStorageResp,


return mq.ReplyOK(coormq.RespGetStorage(stg)) return mq.ReplyOK(coormq.RespGetStorage(stg))
} }

func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.GetStorageDetailsResp, *mq.CodeMessage) {
stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail)

svc.db2.DoTx(func(tx db2.SQLContext) error {
stgs, err := svc.db2.Storage().BatchGetByID(tx, msg.StorageIDs)
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("getting storage: %w", err)
}

details := make([]stgmod.StorageDetail, len(stgs))
for i, stg := range stgs {
details[i] = stgmod.StorageDetail{
Storage: stg,
}
stgsMp[stg.StorageID] = &details[i]
}
err = svc.db2.Storage().FillDetails(tx, details)
if err != nil {
return err
}

return nil
})

ret := make([]*stgmod.StorageDetail, len(msg.StorageIDs))
for i, id := range msg.StorageIDs {
stg, ok := stgsMp[id]
if !ok {
ret[i] = nil
continue
}
ret[i] = stg
}

return mq.ReplyOK(coormq.RespGetStorageDetails(ret))
}

func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*coormq.GetUserStorageDetailsResp, *mq.CodeMessage) {
var ret []stgmod.StorageDetail

svc.db2.DoTx(func(tx db2.SQLContext) error {
stgs, err := svc.db2.Storage().GetUserStorages(tx, msg.UserID)
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("getting user storages: %w", err)
}

for _, stg := range stgs {
ret = append(ret, stgmod.StorageDetail{
Storage: stg,
})
}
err = svc.db2.Storage().FillDetails(tx, ret)
if err != nil {
return err
}

return nil
})

return mq.ReplyOK(coormq.RespGetUserStorageDetails(ret))
}

func (svc *Service) GetStorageByName(msg *coormq.GetStorageByName) (*coormq.GetStorageByNameResp, *mq.CodeMessage) {
stg, err := svc.db2.Storage().GetUserStorageByName(svc.db2.DefCtx(), msg.UserID, msg.Name)
if err != nil {
logger.Warnf("getting user storage by name: %s", err.Error())

if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, mq.Failed(errorcode.DataNotFound, "storage not found")
}

return nil, mq.Failed(errorcode.OperationFailed, "get user storage failed")
}

return mq.ReplyOK(coormq.RespGetStorageByNameResp(stg))
}

func (svc *Service) StoragePackageLoaded(msg *coormq.StoragePackageLoaded) (*coormq.StoragePackageLoadedResp, *mq.CodeMessage) {
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
// TODO 权限检查
exists, err := svc.db2.Object().BatchTestObjectID(tx, msg.PinnedObjects)
if err != nil {
return fmt.Errorf("testing object id: %w", err)
}

pinned := make([]cdssdk.PinnedObject, 0, len(msg.PinnedObjects))
for _, obj := range msg.PinnedObjects {
if exists[obj] {
pinned = append(pinned, cdssdk.PinnedObject{
StorageID: msg.StorageID,
ObjectID: obj,
CreateTime: time.Now(),
})
}
}

err = svc.db2.PinnedObject().BatchTryCreate(tx, pinned)
if err != nil {
return fmt.Errorf("batch creating pinned object: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).
WithField("StorageID", msg.StorageID).
WithField("PackageID", msg.PackageID).
Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "user load package to storage failed")
}

return mq.ReplyOK(coormq.RespStoragePackageLoaded())
}

+ 0
- 52
coordinator/internal/mq/temp.go View File

@@ -1,52 +0,0 @@
package mq

import (
"fmt"

"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

func (svc *Service) GetDatabaseAll(msg *coormq.GetDatabaseAll) (*coormq.GetDatabaseAllResp, *mq.CodeMessage) {
var bkts []cdssdk.Bucket
var pkgs []cdssdk.Package
var objs []stgmod.ObjectDetail

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error
bkts, err = svc.db2.Bucket().GetUserBuckets(tx, msg.UserID)
if err != nil {
return fmt.Errorf("get user buckets: %w", err)
}

for _, bkt := range bkts {
ps, err := svc.db2.Package().GetUserBucketPackages(tx, msg.UserID, bkt.BucketID)
if err != nil {
return fmt.Errorf("get bucket packages: %w", err)
}
pkgs = append(pkgs, ps...)
}

for _, pkg := range pkgs {
os, err := svc.db2.Object().GetPackageObjectDetails(tx, pkg.PackageID)
if err != nil {
return fmt.Errorf("get package object details: %w", err)
}
objs = append(objs, os...)
}

return nil
})
if err != nil {
logger.Warnf("batch deleting objects: %s", err.Error())
return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed")
}

return mq.ReplyOK(coormq.RespGetDatabaseAll(bkts, pkgs, objs))
}

+ 0
- 127
coordinator/internal/mq/user.go View File

@@ -1,127 +0,0 @@
package mq

import (
"errors"
"fmt"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/mq"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gorm.io/gorm"
)

func (svc *Service) CreateUser(msg *coormq.CreateUser) (*coormq.CreateUserResp, *mq.CodeMessage) {
var user cdssdk.User
err := svc.db2.DoTx(func(tx db2.SQLContext) error {
var err error

user, err = svc.db2.User().Create(tx, msg.Name)
if err != nil {
return fmt.Errorf("creating user: %w", err)
}

// TODO 目前新建用户的权限与ID 1的相同
hubs, err := svc.db2.UserHub().GetByUserID(tx, 1)
if err != nil {
return fmt.Errorf("getting user hubs: %w", err)
}

stgs, err := svc.db2.UserStorage().GetByUserID(tx, 1)
if err != nil {
return fmt.Errorf("getting user storages: %w", err)
}

for _, hub := range hubs {
err := svc.db2.UserHub().Create(tx, user.UserID, hub.HubID)
if err != nil {
return fmt.Errorf("creating user hub: %w", err)
}
}

for _, stg := range stgs {
err := svc.db2.UserStorage().Create(tx, user.UserID, stg.StorageID)
if err != nil {
return fmt.Errorf("creating user storage: %w", err)
}
}

return nil
})
if err != nil {
logger.WithField("Name", msg.Name).
Warn(err.Error())

if errors.Is(err, gorm.ErrDuplicatedKey) {
return nil, mq.Failed(errorcode.DataExists, "user name already exists")
}

return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespCreateUser(user))
}

func (svc *Service) DeleteUser(msg *coormq.DeleteUser) (*coormq.DeleteUserResp, *mq.CodeMessage) {
// TODO 目前不能删除ID 1的用户
if msg.UserID == 1 {
return nil, mq.Failed(errorcode.OperationFailed, "cannot delete the default user")
}

err := svc.db2.DoTx(func(tx db2.SQLContext) error {
err := svc.db2.User().Delete(tx, msg.UserID)
if err != nil {
return fmt.Errorf("deleting user: %w", err)
}

err = svc.db2.UserHub().DeleteByUserID(tx, msg.UserID)
if err != nil {
return fmt.Errorf("deleting user hubs: %w", err)
}

err = svc.db2.UserStorage().DeleteByUserID(tx, msg.UserID)
if err != nil {
return fmt.Errorf("deleting user storages: %w", err)
}

bkts, err := svc.db2.UserBucket().GetByUserID(tx, msg.UserID)
if err != nil {
return fmt.Errorf("getting user buckets: %w", err)
}

for _, bkt := range bkts {
pkgs, err := svc.db2.Package().GetBucketPackages(tx, bkt.BucketID)
if err != nil {
return fmt.Errorf("getting bucket packages: %w", err)
}

for _, pkg := range pkgs {
err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err)
}
}

err = svc.db2.Bucket().Delete(tx, bkt.BucketID)
if err != nil {
return fmt.Errorf("deleting bucket: %w", err)
}
}

err = svc.db2.UserBucket().DeleteByUserID(tx, msg.UserID)
if err != nil {
return fmt.Errorf("deleting user buckets: %w", err)
}

return nil
})
if err != nil {
logger.WithField("UserID", msg.UserID).
Warn(err.Error())
return nil, mq.Failed(errorcode.OperationFailed, err.Error())
}

return mq.ReplyOK(coormq.RespDeleteUser())
}

+ 0
- 23
coordinator/internal/mq/utils.go View File

@@ -1,23 +0,0 @@
package mq

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
)

func getBlockTypeFromRed(red cdssdk.Redundancy) string {
switch red.(type) {
case *cdssdk.NoneRedundancy:
return stgmod.BlockTypeRaw

case *cdssdk.ECRedundancy:
return stgmod.BlockTypeEC

case *cdssdk.LRCRedundancy:
return stgmod.BlockTypeEC

case *cdssdk.SegmentRedundancy:
return stgmod.BlockTypeSegment
}
return ""
}

+ 6
- 33
coordinator/types/storage.go View File

@@ -7,10 +7,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


type UserStorageConfig interface {
GetUserStorageConfigType() string
}

type Storage struct { type Storage struct {
StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement;"` StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; type:bigint; autoIncrement;"`
Name string `json:"name" gorm:"column:Name; type:varchar(256); not null"` Name string `json:"name" gorm:"column:Name; type:varchar(256); not null"`
@@ -51,7 +47,7 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Storage
type MashupStorageType struct { type MashupStorageType struct {
serder.Metadata `union:"Mashup"` serder.Metadata `union:"Mashup"`
Type string `json:"type"` Type string `json:"type"`
Agent StorageType `json:"agent"` // 创建Agent时,使用的存储服务类型
Store StorageType `json:"store"` // 创建ShardStore或PublicStore时,使用的存储服务类型
Feature StorageType `json:"feature"` // 根据Feature创建组件时使用的存储服务类型 Feature StorageType `json:"feature"` // 根据Feature创建组件时使用的存储服务类型
} }


@@ -79,11 +75,6 @@ func (a *LocalStorageType) String() string {
type OSSType struct { type OSSType struct {
serder.Metadata `union:"OSS"` serder.Metadata `union:"OSS"`
Type string `json:"type"` Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
} }


func (a *OSSType) GetStorageType() string { func (a *OSSType) GetStorageType() string {
@@ -97,12 +88,6 @@ func (a *OSSType) String() string {
type OBSType struct { type OBSType struct {
serder.Metadata `union:"OBS"` serder.Metadata `union:"OBS"`
Type string `json:"type"` Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
ProjectID string `json:"projectID"`
} }


func (a *OBSType) GetStorageType() string { func (a *OBSType) GetStorageType() string {
@@ -116,11 +101,6 @@ func (a *OBSType) String() string {
type COSType struct { type COSType struct {
serder.Metadata `union:"COS"` serder.Metadata `union:"COS"`
Type string `json:"type"` Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
} }


func (a *COSType) GetStorageType() string { func (a *COSType) GetStorageType() string {
@@ -134,13 +114,6 @@ func (a *COSType) String() string {
type EFileType struct { type EFileType struct {
serder.Metadata `union:"EFile"` serder.Metadata `union:"EFile"`
Type string `json:"type"` Type string `json:"type"`
TokenURL string `json:"tokenURL"`
APIURL string `json:"apiURL"`
TokenExpire int `json:"tokenExpire"` // 单位秒
User string `json:"user"`
Password string `json:"password"`
OrgID string `json:"orgID"`
ClusterID string `json:"clusterID"`
} }


func (a *EFileType) GetStorageType() string { func (a *EFileType) GetStorageType() string {
@@ -155,11 +128,6 @@ func (a *EFileType) String() string {
type S3Type struct { type S3Type struct {
serder.Metadata `union:"S3"` serder.Metadata `union:"S3"`
Type string `json:"type"` Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
} }


func (a *S3Type) GetStorageType() string { func (a *S3Type) GetStorageType() string {
@@ -169,3 +137,8 @@ func (a *S3Type) GetStorageType() string {
func (a *S3Type) String() string { func (a *S3Type) String() string {
return "S3" return "S3"
} }

type ShardStoreUserConfig struct {
Root string `json:"root"`
MaxSize int64 `json:"maxSize"`
}

+ 92
- 0
coordinator/types/storage_credential.go View File

@@ -0,0 +1,92 @@
package types

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type StorageCredential interface {
GetStorageCredentialType() string
}

var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[StorageCredential](
(*LocalCred)(nil),
(*MashupCred)(nil),
(*OBSCred)(nil),
(*OSSCred)(nil),
(*COSCred)(nil),
(*EFileCred)(nil),
(*S3Cred)(nil),
)), "type")

type LocalCred struct {
StorageCredential
serder.Metadata `union:"Local"`
Type string `json:"type"`
}

type MashupCred struct {
StorageCredential
serder.Metadata `union:"Mashup"`
Store StorageCredential `json:"store"`
Feature StorageCredential `json:"feature"`
}

type OSSCred struct {
StorageCredential
serder.Metadata `union:"OSS"`
Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

type OBSCred struct {
StorageCredential
serder.Metadata `union:"OBS"`
Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
ProjectID string `json:"projectID"`
}

type COSCred struct {
StorageCredential
serder.Metadata `union:"COS"`
Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

type EFileCred struct {
StorageCredential
serder.Metadata `union:"EFile"`
Type string `json:"type"`
TokenURL string `json:"tokenURL"`
APIURL string `json:"apiURL"`
TokenExpire int `json:"tokenExpire"` // 单位秒
User string `json:"user"`
Password string `json:"password"`
OrgID string `json:"orgID"`
ClusterID string `json:"clusterID"`
}

// 通用的S3协议的存储服务
type S3Cred struct {
StorageCredential
serder.Metadata `union:"S3"`
Type string `json:"type"`
Region string `json:"region"`
AK string `json:"accessKeyId"`
SK string `json:"secretAccessKey"`
Endpoint string `json:"endpoint"`
Bucket string `json:"bucket"`
}

+ 20
- 0
coordinator/types/types.go View File

@@ -14,6 +14,8 @@ type HubID int64


type LocationID int64 type LocationID int64


type UserID int64

type Hub struct { type Hub struct {
HubID HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` HubID HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"`
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
@@ -68,3 +70,21 @@ type HubConnectivity struct {
func (HubConnectivity) TableName() string { func (HubConnectivity) TableName() string {
return "HubConnectivity" return "HubConnectivity"
} }

type Location struct {
LocationID LocationID `gorm:"column:LocationID; primaryKey; type:bigint; autoIncrement" json:"locationID"`
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
}

func (Location) TableName() string {
return "Location"
}

type User struct {
UserID UserID `gorm:"column:UserID; primaryKey; type:bigint; autoIncrement" json:"userID"`
Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"`
}

func (User) TableName() string {
return "User"
}

Loading…
Cancel
Save