Browse Source

调整目录结构

gitlink
Sydonian 7 months ago
parent
commit
f0015ed3e4
34 changed files with 1967 additions and 1591 deletions
  1. +149
    -0
      client/internal/db/bucket.go
  2. +105
    -0
      client/internal/db/cache.go
  3. +21
    -0
      client/internal/db/config.go
  4. +38
    -0
      client/internal/db/db2.go
  5. +60
    -0
      client/internal/db/hub.go
  6. +37
    -0
      client/internal/db/hub_connectivity.go
  7. +5
    -8
      client/internal/db/location.go
  8. +511
    -0
      client/internal/db/object.go
  9. +116
    -0
      client/internal/db/object_access_stat.go
  10. +122
    -0
      client/internal/db/object_block.go
  11. +209
    -0
      client/internal/db/package.go
  12. +79
    -0
      client/internal/db/package_access_stat.go
  13. +122
    -0
      client/internal/db/pinned_object.go
  14. +120
    -0
      client/internal/db/storage.go
  15. +44
    -0
      client/internal/db/union_serializer.go
  16. +44
    -0
      client/internal/db/user.go
  17. +36
    -0
      client/internal/db/user_bucket.go
  18. +34
    -0
      client/internal/db/user_hub.go
  19. +34
    -0
      client/internal/db/user_storage.go
  20. +81
    -0
      client/internal/db/utils.go
  21. +0
    -140
      common/pkgs/db/bucket.go
  22. +0
    -125
      common/pkgs/db/cache.go
  23. +0
    -68
      common/pkgs/db/db.go
  24. +0
    -43
      common/pkgs/db/node.go
  25. +0
    -42
      common/pkgs/db/node_connectivity.go
  26. +0
    -376
      common/pkgs/db/object.go
  27. +0
    -123
      common/pkgs/db/object_access_stat.go
  28. +0
    -138
      common/pkgs/db/object_block.go
  29. +0
    -207
      common/pkgs/db/package.go
  30. +0
    -86
      common/pkgs/db/package_access_stat.go
  31. +0
    -119
      common/pkgs/db/storage_package.go
  32. +0
    -23
      common/pkgs/db/user.go
  33. +0
    -16
      common/pkgs/db/user_bucket.go
  34. +0
    -77
      common/pkgs/db/utils.go

+ 149
- 0
client/internal/db/bucket.go View File

@@ -0,0 +1,149 @@
package db2

import (
"errors"
"fmt"
"time"

"gorm.io/gorm"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type BucketDB struct {
*DB
}

func (db *DB) Bucket() *BucketDB {
return &BucketDB{DB: db}
}

func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := ctx.Table("Bucket").Where("BucketID = ?", bucketID).First(&ret).Error
return ret, err
}

func (db *BucketDB) GetByName(ctx SQLContext, bucketName string) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := ctx.Table("Bucket").Where("Name = ?", bucketName).First(&ret).Error
return ret, err
}

// GetIDByName 根据BucketName查询BucketID
func (db *BucketDB) GetIDByName(ctx SQLContext, bucketName string) (int64, error) {
var result struct {
BucketID int64 `gorm:"column:BucketID"`
BucketName string `gorm:"column:BucketName"`
}

err := ctx.Table("Bucket").Select("BucketID, BucketName").Where("BucketName = ?", bucketName).Scan(&result).Error
if err != nil {
return 0, err
}

return result.BucketID, nil
}

func (*BucketDB) GetAll(ctx SQLContext) ([]cdssdk.Bucket, error) {
var ret []cdssdk.Bucket
err := ctx.Table("Bucket").Find(&ret).Error
return ret, err
}

// IsAvailable 判断用户是否有指定Bucekt的权限
func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) {
_, err := db.GetUserBucket(ctx, userID, bucketID)
if errors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find bucket failed, err: %w", err)
}

return true, nil
}

func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) {
var ret model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.BucketID = ?", userID, bucketID).
First(&ret).Error
return ret, err
}

func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) {
var ret model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName).
First(&ret).Error
return ret, err
}

func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) {
var ret []model.Bucket
err := ctx.Table("UserBucket").
Select("Bucket.*").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ?", userID).
Find(&ret).Error
return ret, err
}

func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string, createTime time.Time) (cdssdk.Bucket, error) {
var bucketID int64
err := ctx.Table("UserBucket").
Select("Bucket.BucketID").
Joins("JOIN Bucket ON UserBucket.BucketID = Bucket.BucketID").
Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName).
Scan(&bucketID).Error

if err != nil {
return cdssdk.Bucket{}, err
}

if bucketID > 0 {
return cdssdk.Bucket{}, gorm.ErrDuplicatedKey
}

newBucket := cdssdk.Bucket{Name: bucketName, CreateTime: createTime, CreatorID: userID}
if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil {
return cdssdk.Bucket{}, fmt.Errorf("insert bucket failed, err: %w", err)
}

err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error
if err != nil {
return cdssdk.Bucket{}, fmt.Errorf("insert user bucket: %w", err)
}

return newBucket, nil
}

func (db *BucketDB) Rename(ctx SQLContext, bucketID cdssdk.BucketID, bucketName string) error {
return ctx.Table("Bucket").Where("BucketID = ?", bucketID).Update("Name", bucketName).Error
}

func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error {
return ctx.Delete(&cdssdk.Bucket{}, "BucketID = ?", bucketID).Error
}

func (db *BucketDB) DeleteComplete(tx SQLContext, bucketID cdssdk.BucketID) error {
pkgs, err := db.Package().GetBucketPackages(tx, bucketID)
if err != nil {
return err
}

for _, pkg := range pkgs {
err := db.Package().DeleteComplete(tx, pkg.PackageID)
if err != nil {
return err
}
}
return db.Bucket().Delete(tx, bucketID)
}

+ 105
- 0
client/internal/db/cache.go View File

@@ -0,0 +1,105 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm/clause"
)

type CacheDB struct {
*DB
}

func (db *DB) Cache() *CacheDB {
return &CacheDB{DB: db}
}

func (*CacheDB) Get(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID) (model.Cache, error) {
var ret model.Cache
err := ctx.Table("Cache").Where("FileHash = ? AND StorageID = ?", fileHash, stgID).First(&ret).Error
return ret, err
}

func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]string, error) {
var ret []string
err := ctx.Table("Cache").Distinct("FileHash").Offset(start).Limit(count).Pluck("FileHash", &ret).Error
return ret, err
}

func (*CacheDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]model.Cache, error) {
var ret []model.Cache
err := ctx.Table("Cache").Where("StorageID = ?", stgID).Find(&ret).Error
return ret, err
}

// Create 创建一条缓存记录,如果已有则不进行操作
func (*CacheDB) Create(ctx SQLContext, fileHash cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error {
cache := model.Cache{FileHash: fileHash, StorageID: stgID, CreateTime: time.Now(), Priority: priority}
return ctx.Where(cache).Attrs(cache).FirstOrCreate(&cache).Error
}

// 批量创建缓存记录
func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error {
if len(caches) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "FileHash"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime", "Priority"}),
}).Create(&caches).Error
}

func (db *CacheDB) BatchCreateOnSameStorage(ctx SQLContext, fileHashes []cdssdk.FileHash, stgID cdssdk.StorageID, priority int) error {
if len(fileHashes) == 0 {
return nil
}

var caches []model.Cache
var nowTime = time.Now()
for _, hash := range fileHashes {
caches = append(caches, model.Cache{
FileHash: hash,
StorageID: stgID,
CreateTime: nowTime,
Priority: priority,
})
}

return db.BatchCreate(ctx, caches)
}

func (*CacheDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error {
if len(fileHashes) == 0 {
return nil
}

return ctx.Table("Cache").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&model.Cache{}).Error
}

// GetCachingFileStorages 查找缓存了指定文件的存储服务
func (*CacheDB) GetCachingFileStorages(ctx SQLContext, fileHash cdssdk.FileHash) ([]cdssdk.Storage, error) {
var stgs []cdssdk.Storage
err := ctx.Table("Cache").Select("Storage.*").
Joins("JOIN Storage ON Cache.StorageID = Storage.StorageID").
Where("Cache.FileHash = ?", fileHash).
Find(&stgs).Error
return stgs, err
}

// DeleteStorageAll 删除一个存储服务所有的记录
func (*CacheDB) DeleteStorageAll(ctx SQLContext, StorageID cdssdk.StorageID) error {
return ctx.Where("StorageID = ?", StorageID).Delete(&model.Cache{}).Error
}

// FindCachingFileUserStorages 在缓存表中查询指定数据所在的节点
func (*CacheDB) FindCachingFileUserStorages(ctx SQLContext, userID cdssdk.UserID, fileHash string) ([]cdssdk.Storage, error) {
var stgs []cdssdk.Storage
err := ctx.Table("Cache").Select("Storage.*").
Joins("JOIN UserStorage ON Cache.StorageID = UserStorage.StorageID").
Where("Cache.FileHash = ? AND UserStorage.UserID = ?", fileHash, userID).
Find(&stgs).Error
return stgs, err
}

+ 21
- 0
client/internal/db/config.go View File

@@ -0,0 +1,21 @@
package config

import "fmt"

type Config struct {
Address string `json:"address"`
Account string `json:"account"`
Password string `json:"password"`
DatabaseName string `json:"databaseName"`
}

func (cfg *Config) MakeSourceString() string {
return fmt.Sprintf(
"%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s",
cfg.Account,
cfg.Password,
cfg.Address,
cfg.DatabaseName,
"Asia%2FShanghai",
)
}

+ 38
- 0
client/internal/db/db2.go View File

@@ -0,0 +1,38 @@
package db2

import (
_ "github.com/go-sql-driver/mysql"
"github.com/sirupsen/logrus"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/config"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

type DB struct {
db *gorm.DB
}

func NewDB(cfg *config.Config) (*DB, error) {
mydb, err := gorm.Open(mysql.Open(cfg.MakeSourceString()), &gorm.Config{})
if err != nil {
logrus.Fatalf("failed to connect to database: %v", err)
}

return &DB{
db: mydb,
}, nil
}

func (db *DB) DoTx(do func(tx SQLContext) error) error {
return db.db.Transaction(func(tx *gorm.DB) error {
return do(SQLContext{tx})
})
}

type SQLContext struct {
*gorm.DB
}

func (db *DB) DefCtx() SQLContext {
return SQLContext{db.db}
}

+ 60
- 0
client/internal/db/hub.go View File

@@ -0,0 +1,60 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type HubDB struct {
*DB
}

func (db *DB) Hub() *HubDB {
return &HubDB{DB: db}
}

func (*HubDB) GetAllHubs(ctx SQLContext) ([]cdssdk.Hub, error) {
var ret []cdssdk.Hub

err := ctx.Table("Hub").Find(&ret).Error
return ret, err
}

func (*HubDB) GetByID(ctx SQLContext, hubID cdssdk.HubID) (cdssdk.Hub, error) {
var ret cdssdk.Hub
err := ctx.Table("Hub").Where("HubID = ?", hubID).Find(&ret).Error

return ret, err
}

func (*HubDB) BatchGetByID(ctx SQLContext, hubIDs []cdssdk.HubID) ([]cdssdk.Hub, error) {
var ret []cdssdk.Hub
err := ctx.Table("Hub").Where("HubID IN (?)", hubIDs).Find(&ret).Error

return ret, err
}

// GetUserHubs 根据用户id查询可用hub
func (*HubDB) GetUserHubs(ctx SQLContext, userID cdssdk.UserID) ([]cdssdk.Hub, error) {
var hubs []cdssdk.Hub
err := ctx.
Table("Hub").
Select("Hub.*").
Joins("JOIN UserHub ON UserHub.HubID = Hub.HubID").
Where("UserHub.UserID = ?", userID).
Find(&hubs).Error
return hubs, err
}

// UpdateState 更新状态,并且设置上次上报时间为现在
func (*HubDB) UpdateState(ctx SQLContext, hubID cdssdk.HubID, state string) error {
err := ctx.
Model(&cdssdk.Hub{}).
Where("HubID = ?", hubID).
Updates(map[string]interface{}{
"State": state,
"LastReportTime": time.Now(),
}).Error
return err
}

+ 37
- 0
client/internal/db/hub_connectivity.go View File

@@ -0,0 +1,37 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm/clause"
)

type HubConnectivityDB struct {
*DB
}

func (db *DB) HubConnectivity() *HubConnectivityDB {
return &HubConnectivityDB{DB: db}
}

func (db *HubConnectivityDB) BatchGetByFromHub(ctx SQLContext, fromHubIDs []cdssdk.HubID) ([]model.HubConnectivity, error) {
if len(fromHubIDs) == 0 {
return nil, nil
}

var ret []model.HubConnectivity

err := ctx.Table("HubConnectivity").Where("FromHubID IN (?)", fromHubIDs).Find(&ret).Error
return ret, err
}

func (db *HubConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.HubConnectivity) error {
if len(cons) == 0 {
return nil
}

// 使用 GORM 的批量插入或更新
return ctx.Table("HubConnectivity").Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&cons).Error
}

common/pkgs/db/location.go → client/internal/db/location.go View File

@@ -1,11 +1,9 @@
package db
package db2

/*
import (
"fmt"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type LocationDB struct {
@@ -18,15 +16,15 @@ func (db *DB) Location() *LocationDB {

func (*LocationDB) GetByID(ctx SQLContext, id int64) (model.Location, error) {
var ret model.Location
err := sqlx.Get(ctx, &ret, "select * from Location where LocationID = ?", id)
err := ctx.First(&ret, id).Error
return ret, err
}

func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model.Location, error) {
var locID int64
err := sqlx.Get(ctx, &locID, "select LocationID from Node where ExternalIP = ?", ip)
err := ctx.Table("Hub").Select("LocationID").Where("ExternalIP = ?", ip).Scan(&locID).Error
if err != nil {
return model.Location{}, fmt.Errorf("finding node by external ip: %w", err)
return model.Location{}, fmt.Errorf("finding hub by external ip: %w", err)
}

loc, err := db.GetByID(ctx, locID)
@@ -36,4 +34,3 @@ func (db *LocationDB) FindLocationByExternalIP(ctx SQLContext, ip string) (model

return loc, nil
}
*/

+ 511
- 0
client/internal/db/object.go View File

@@ -0,0 +1,511 @@
package db2

import (
"fmt"
"strings"
"time"

"gorm.io/gorm"
"gorm.io/gorm/clause"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
)

type ObjectDB struct {
*DB
}

func (db *DB) Object() *ObjectDB {
return &ObjectDB{DB: db}
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByFullPath(ctx SQLContext, bktName string, pkgName string, path string) (cdssdk.Object, error) {
var ret cdssdk.Object
err := ctx.Table("Object").
Joins("join Package on Package.PackageID = Object.PackageID and Package.Name = ?", pkgName).
Joins("join Bucket on Bucket.BucketID = Package.BucketID and Bucket.Name = ?", bktName).
Where("Object.Path = ?", path).First(&ret).Error
return ret, err
}

func (db *ObjectDB) GetWithPathPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).Order("ObjectID ASC").Find(&ret).Error
return ret, err
}

// 查询结果将按照Path升序,而不是ObjectID升序
func (db *ObjectDB) GetWithPathPrefixPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path > ? AND Path LIKE ?", packageID, startPath, pathPrefix+"%").Order("Path ASC").Limit(limit).Find(&ret).Error
return ret, err
}

func (db *ObjectDB) GetByPrefixGrouped(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (objs []cdssdk.Object, commonPrefixes []string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return
}

for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
}

return
}

func (db *ObjectDB) GetByPrefixGroupedPaged(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string, startPath string, limit int) (objs []cdssdk.Object, commonPrefixes []string, nextStartPath string, err error) {
type ObjectOrDir struct {
cdssdk.Object
IsObject bool `gorm:"IsObject"`
Prefix string `gorm:"Prefix"`
}

sepCnt := strings.Count(pathPrefix, cdssdk.ObjectPathSeparator) + 1

prefixStatm := fmt.Sprintf("Substring_Index(Path, '%s', %d)", cdssdk.ObjectPathSeparator, sepCnt)
grouping := ctx.Table("Object").
Select(fmt.Sprintf("%s as Prefix, Max(ObjectID) as ObjectID, %s = Path as IsObject", prefixStatm, prefixStatm)).
Where("PackageID = ?", packageID).
Where("Path like ?", pathPrefix+"%").
Group("Prefix, IsObject").
Having("Prefix > ?", startPath).
Limit(limit).
Order("Prefix ASC")

var ret []ObjectOrDir
err = ctx.Table("Object").
Select("Grouped.IsObject, Grouped.Prefix, Object.*").
Joins("right join (?) as Grouped on Object.ObjectID = Grouped.ObjectID and Grouped.IsObject = 1", grouping).
Find(&ret).Error
if err != nil {
return
}

for _, o := range ret {
if o.IsObject {
objs = append(objs, o.Object)
} else {
commonPrefixes = append(commonPrefixes, o.Prefix+cdssdk.ObjectPathSeparator)
}
nextStartPath = o.Prefix
}

return
}

func (db *ObjectDB) HasObjectWithPrefix(ctx SQLContext, packageID cdssdk.PackageID, pathPrefix string) (bool, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", packageID, escapeLike("", "%", pathPrefix)).First(&obj).Error
if err == nil {
return true, nil
}

if err == gorm.ErrRecordNotFound {
return false, nil
}

return false, err
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
if len(objectIDs) == 0 {
return make(map[cdssdk.ObjectID]bool), nil
}

var avaiIDs []cdssdk.ObjectID
err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Pluck("ObjectID", &avaiIDs).Error
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.ObjectID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.Object, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var objs []cdssdk.Object
err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, err
}

return objs, nil
}

func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
if len(pathes) == 0 {
return nil, nil
}

var objs []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
if err != nil {
return nil, err
}

return objs, nil
}

func (db *ObjectDB) GetDetail(ctx SQLContext, objectID cdssdk.ObjectID) (stgmod.ObjectDetail, error) {
var obj cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&obj).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting object: %w", err)
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Order("`Index` ASC").Find(&allBlocks).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all object blocks: %w", err)
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").Where("ObjectID = ?", objectID).Order("ObjectID ASC").Find(&allPinnedObjs).Error
if err != nil {
return stgmod.ObjectDetail{}, fmt.Errorf("getting all pinned objects: %w", err)
}

pinnedAt := make([]cdssdk.StorageID, len(allPinnedObjs))
for i, po := range allPinnedObjs {
pinnedAt[i] = po.StorageID
}

return stgmod.ObjectDetail{
Object: obj,
Blocks: allBlocks,
PinnedAt: pinnedAt,
}, nil
}

// 仅返回查询到的对象
func (db *ObjectDB) BatchGetDetails(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectDetail, error) {
var objs []cdssdk.Object

err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, err
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").Where("ObjectID IN ?", objectIDs).Order("ObjectID, `Index` ASC").Find(&allBlocks).Error
if err != nil {
return nil, err
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&allPinnedObjs).Error
if err != nil {
return nil, err
}

details := make([]stgmod.ObjectDetail, len(objs))
for i, obj := range objs {
details[i] = stgmod.ObjectDetail{
Object: obj,
}
}

stgmod.DetailsFillObjectBlocks(details, allBlocks)
stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
return details, nil
}

func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
err := ctx.Table("Object").Create(&obj).Error
if err != nil {
return 0, fmt.Errorf("insert object failed, err: %w", err)
}
return obj.ObjectID, nil
}

// 批量创建对象,创建完成后会填充ObjectID。
func (db *ObjectDB) BatchCreate(ctx SQLContext, objs *[]cdssdk.Object) error {
if len(*objs) == 0 {
return nil
}

return ctx.Table("Object").Create(objs).Error
}

// 批量更新对象所有属性,objs中的对象必须包含ObjectID
func (db *ObjectDB) BatchUpdate(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}},
UpdateAll: true,
}).Create(objs).Error
}

// 批量更新对象指定属性,objs中的对象只需设置需要更新的属性即可,但:
// 1. 必须包含ObjectID
// 2. 日期类型属性不能设置为0值
func (db *ObjectDB) BatchUpdateColumns(ctx SQLContext, objs []cdssdk.Object, columns []string) error {
if len(objs) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}},
DoUpdates: clause.AssignmentColumns(columns),
}).Create(objs).Error
}

func (db *ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&ret).Error
return ret, err
}

func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ?", packageID).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

// 获取所有的 ObjectBlock
var allBlocks []stgmod.ObjectBlock
err = ctx.Table("ObjectBlock").
Select("ObjectBlock.*").
Joins("JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("ObjectBlock.ObjectID, `Index` ASC").
Find(&allBlocks).Error
if err != nil {
return nil, fmt.Errorf("getting all object blocks: %w", err)
}

// 获取所有的 PinnedObject
var allPinnedObjs []cdssdk.PinnedObject
err = ctx.Table("PinnedObject").
Select("PinnedObject.*").
Joins("JOIN Object ON PinnedObject.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("PinnedObject.ObjectID").
Find(&allPinnedObjs).Error
if err != nil {
return nil, fmt.Errorf("getting all pinned objects: %w", err)
}

details := make([]stgmod.ObjectDetail, len(objs))
for i, obj := range objs {
details[i] = stgmod.ObjectDetail{
Object: obj,
}
}

stgmod.DetailsFillObjectBlocks(details, allBlocks)
stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
return details, nil
}

func (db *ObjectDB) GetObjectsIfAnyBlockOnStorage(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
var objs []cdssdk.Object
err := ctx.Table("Object").Where("ObjectID IN (SELECT ObjectID FROM ObjectBlock WHERE StorageID = ?)", stgID).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

return objs, nil
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
if len(adds) == 0 {
return nil, nil
}

// 收集所有路径
pathes := make([]string, 0, len(adds))
for _, add := range adds {
pathes = append(pathes, add.Path)
}

// 先查询要更新的对象,不存在也没关系
existsObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch get object by path: %w", err)
}

existsObjsMap := make(map[string]cdssdk.Object)
for _, obj := range existsObjs {
existsObjsMap[obj.Path] = obj
}

var updatingObjs []cdssdk.Object
var addingObjs []cdssdk.Object
for i := range adds {
o := cdssdk.Object{
PackageID: packageID,
Path: adds[i].Path,
Size: adds[i].Size,
FileHash: adds[i].FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: adds[i].UploadTime,
UpdateTime: adds[i].UploadTime,
}

e, ok := existsObjsMap[adds[i].Path]
if ok {
o.ObjectID = e.ObjectID
o.CreateTime = e.CreateTime
updatingObjs = append(updatingObjs, o)

} else {
addingObjs = append(addingObjs, o)
}
}

// 先进行更新
err = db.BatchUpdate(ctx, updatingObjs)
if err != nil {
return nil, fmt.Errorf("batch update objects: %w", err)
}

// 再执行插入,Create函数插入后会填充ObjectID
err = db.BatchCreate(ctx, &addingObjs)
if err != nil {
return nil, fmt.Errorf("batch create objects: %w", err)
}

// 按照add参数的顺序返回结果
affectedObjsMp := make(map[string]cdssdk.Object)
for _, o := range updatingObjs {
affectedObjsMp[o.Path] = o
}
for _, o := range addingObjs {
affectedObjsMp[o.Path] = o
}
affectedObjs := make([]cdssdk.Object, 0, len(affectedObjsMp))
affectedObjIDs := make([]cdssdk.ObjectID, 0, len(affectedObjsMp))
for i := range adds {
obj := affectedObjsMp[adds[i].Path]
affectedObjs = append(affectedObjs, obj)
affectedObjIDs = append(affectedObjIDs, obj.ObjectID)
}

if len(affectedObjIDs) > 0 {
// 批量删除 ObjectBlock
if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
}

// 批量删除 PinnedObject
if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
}
}

// 创建 ObjectBlock
objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds {
for _, stgID := range add.StorageIDs {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: affectedObjIDs[i],
Index: 0,
StorageID: stgID,
FileHash: add.FileHash,
Size: add.Size,
})
}
}
if err := db.ObjectBlock().BatchCreate(ctx, objBlocks); err != nil {
return nil, fmt.Errorf("batch create object blocks: %w", err)
}

// 创建 Cache
caches := make([]model.Cache, 0, len(adds))
for _, add := range adds {
for _, stgID := range add.StorageIDs {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
StorageID: stgID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
if err := db.Cache().BatchCreate(ctx, caches); err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
}

return affectedObjs, nil
}

func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
if len(ids) == 0 {
return nil
}

return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) DeleteByPath(ctx SQLContext, packageID cdssdk.PackageID, path string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path = ?", packageID, path).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) MoveByPrefix(ctx SQLContext, oldPkgID cdssdk.PackageID, oldPrefix string, newPkgID cdssdk.PackageID, newPrefix string) error {
return ctx.Table("Object").Where("PackageID = ? AND Path LIKE ?", oldPkgID, escapeLike("", "%", oldPrefix)).
Updates(map[string]any{
"PackageID": newPkgID,
"Path": gorm.Expr("concat(?, substring(Path, ?))", newPrefix, len(oldPrefix)+1),
}).Error
}

+ 116
- 0
client/internal/db/object_access_stat.go View File

@@ -0,0 +1,116 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type ObjectAccessStatDB struct {
*DB
}

func (db *DB) ObjectAccessStat() *ObjectAccessStatDB {
return &ObjectAccessStatDB{db}
}

func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, stgID cdssdk.StorageID) (stgmod.ObjectAccessStat, error) {
var ret stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID = ? AND StorageID = ?", objID, stgID).
First(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID = ?", objID).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID IN ?", objIDs).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectIDOnStorage(ctx SQLContext, objIDs []cdssdk.ObjectID, stgID cdssdk.StorageID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
err := ctx.Table("ObjectAccessStat").
Where("ObjectID IN ? AND StorageID = ?", objIDs, stgID).
Find(&ret).Error
return ret, err
}

func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

for _, entry := range entries {
acc := stgmod.ObjectAccessStat{
ObjectID: entry.ObjectID,
StorageID: entry.StorageID,
Counter: entry.Counter,
}

err := ctx.Table("ObjectAccessStat").
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.Assignments(map[string]any{
"Counter": gorm.Expr("Counter + values(Counter)"),
}),
}).Create(&acc).Error
if err != nil {
return err
}
}
return nil
}

func (*ObjectAccessStatDB) BatchUpdateAmountInPackage(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

err := ctx.Exec("UPDATE ObjectAccessStat AS o INNER JOIN Object AS obj ON o.ObjectID = obj.ObjectID SET o.Amount = o.Amount * ? + o.Counter * (1 - ?), o.Counter = 0 WHERE obj.PackageID IN ?", historyWeight, historyWeight, pkgIDs).Error
return err
}

func (*ObjectAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
err := ctx.Exec("UPDATE ObjectAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0", historyWeight, historyWeight).Error
return err
}

func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectID) error {
err := ctx.Table("ObjectAccessStat").Where("ObjectID = ?", objID).Delete(nil).Error
return err
}

func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error {
if len(objIDs) == 0 {
return nil
}

err := ctx.Table("ObjectAccessStat").Where("ObjectID IN ?", objIDs).Delete(nil).Error
return err
}

func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Exec("DELETE o FROM ObjectAccessStat o INNER JOIN Object obj ON o.ObjectID = obj.ObjectID WHERE obj.PackageID = ?", packageID).Error
return err
}

+ 122
- 0
client/internal/db/object_block.go View File

@@ -0,0 +1,122 @@
package db2

import (
"strconv"
"strings"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gorm.io/gorm/clause"
)

type ObjectBlockDB struct {
*DB
}

func (db *DB) ObjectBlock() *ObjectBlockDB {
return &ObjectBlockDB{DB: db}
}

func (db *ObjectBlockDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").Where("StorageID = ?", stgID).Find(&rets).Error
return rets, err
}

func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var blocks []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Order("ObjectID, `Index` ASC").Find(&blocks).Error
return blocks, err
}

func (*ObjectBlockDB) GetInPackageID(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := ctx.Table("ObjectBlock").
Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Where("Object.PackageID = ?", packageID).
Order("ObjectBlock.ObjectID, ObjectBlock.`Index` ASC").
Find(&rets).Error
return rets, err
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, stgID cdssdk.StorageID, fileHash cdssdk.FileHash, size int64) error {
block := stgmod.ObjectBlock{ObjectID: objectID, Index: index, StorageID: stgID, FileHash: fileHash, Size: size}
return ctx.Table("ObjectBlock").Create(&block).Error
}

func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
if len(blocks) == 0 {
return nil
}

return ctx.Clauses(clause.Insert{Modifier: "ignore"}).Create(&blocks).Error
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
return ctx.Table("ObjectBlock").Where("ObjectID = ?", objectID).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) DeleteByObjectIDIndex(ctx SQLContext, objectID cdssdk.ObjectID, index int) error {
return ctx.Table("ObjectBlock").Where("ObjectID = ? AND `Index` = ?", objectID, index).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

return ctx.Table("ObjectBlock").Where("ObjectID IN (?)", objectIDs).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("ObjectBlock").Where("ObjectID IN (SELECT ObjectID FROM Object WHERE PackageID = ?)", packageID).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, fileHashes []cdssdk.FileHash) error {
if len(fileHashes) == 0 {
return nil
}

return ctx.Table("ObjectBlock").Where("StorageID = ? AND FileHash IN (?)", stgID, fileHashes).Delete(&stgmod.ObjectBlock{}).Error
}

func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
var cnt int64
err := ctx.Table("ObjectBlock").
Select("COUNT(FileHash)").
Joins("INNER JOIN Object ON ObjectBlock.ObjectID = Object.ObjectID").
Joins("INNER JOIN Package ON Object.PackageID = Package.PackageID").
Where("FileHash = ? AND Package.State = ?", fileHash, cdssdk.PackageStateNormal).
Scan(&cnt).Error

if err != nil {
return 0, err
}

return int(cnt), nil
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitConcatedHubID(idStr string) []cdssdk.HubID {
idStrs := strings.Split(idStr, ",")
ids := make([]cdssdk.HubID, 0, len(idStrs))

for _, str := range idStrs {
// 假设传入的ID是正确的数字格式
id, _ := strconv.ParseInt(str, 10, 64)
ids = append(ids, cdssdk.HubID(id))
}

return ids
}

// 按逗号切割字符串
func splitConcatedFileHash(idStr string) []string {
idStrs := strings.Split(idStr, ",")
return idStrs
}

+ 209
- 0
client/internal/db/package.go View File

@@ -0,0 +1,209 @@
package db2

import (
"fmt"
"time"

"gorm.io/gorm"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type PackageDB struct {
*DB
}

func (db *DB) Package() *PackageDB {
return &PackageDB{DB: db}
}

func (db *PackageDB) GetByID(ctx SQLContext, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").Where("PackageID = ?", packageID).First(&ret).Error
return ret, err
}

func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name string) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").Where("BucketID = ? AND Name = ?", bucketID, name).First(&ret).Error
return ret, err
}

func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) {
if len(pkgIDs) == 0 {
return make(map[cdssdk.PackageID]bool), nil
}

var avaiIDs []cdssdk.PackageID
err := ctx.Table("Package").
Select("PackageID").
Where("PackageID IN ?", pkgIDs).
Find(&avaiIDs).Error
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.PackageID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) {
var ret []cdssdk.PackageID
err := ctx.Table("Package").Select("PackageID").Limit(count).Offset(start).Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetUserBucketPackages(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("UserBucket").
Select("Package.*").
Joins("JOIN Package ON UserBucket.BucketID = Package.BucketID").
Where("UserBucket.UserID = ? AND UserBucket.BucketID = ?", userID, bucketID).
Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetBucketPackages(ctx SQLContext, bucketID cdssdk.BucketID) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("Package").
Select("Package.*").
Where("BucketID = ?", bucketID).
Find(&ret).Error
return ret, err
}

func (db *PackageDB) GetBucketPackagesByName(ctx SQLContext, bucketName string) ([]model.Package, error) {
var ret []model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID").
Where("Bucket.Name = ?", bucketName).
Find(&ret).Error
return ret, err
}

// IsAvailable 判断一个用户是否拥有指定对象
func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) {
var pkgID cdssdk.PackageID
err := ctx.Table("Package").
Select("Package.PackageID").
Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID").
Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID).
Scan(&pkgID).Error

if err == gorm.ErrRecordNotFound {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find package failed, err: %w", err)
}

return true, nil
}

// GetUserPackage 获得Package,如果用户没有权限访问,则不会获得结果
func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN UserBucket ON Package.BucketID = UserBucket.BucketID").
Where("Package.PackageID = ? AND UserBucket.UserID = ?", packageID, userID).
First(&ret).Error
return ret, err
}

// 在指定名称的Bucket中查找指定名称的Package
func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (model.Package, error) {
var ret model.Package
err := ctx.Table("Package").
Select("Package.*").
Joins("JOIN Bucket ON Package.BucketID = Bucket.BucketID").
Joins("JOIN UserBucket ON Bucket.BucketID = UserBucket.BucketID").
Where("Package.Name = ? AND Bucket.Name = ? AND UserBucket.UserID = ?", packageName, bucketName, userID).
First(&ret).Error
return ret, err
}

func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.Package, error) {
var packageID int64
err := ctx.Table("Package").
Select("PackageID").
Where("Name = ? AND BucketID = ?", name, bucketID).
Scan(&packageID).Error

if err != nil {
return cdssdk.Package{}, err
}
if packageID != 0 {
return cdssdk.Package{}, gorm.ErrDuplicatedKey
}

newPackage := cdssdk.Package{Name: name, BucketID: bucketID, CreateTime: time.Now(), State: cdssdk.PackageStateNormal}
if err := ctx.Create(&newPackage).Error; err != nil {
return cdssdk.Package{}, fmt.Errorf("insert package failed, err: %w", err)
}

return newPackage, nil
}

func (*PackageDB) Delete(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Delete(&model.Package{}, "PackageID = ?", packageID).Error
return err
}

// 删除与Package相关的所有数据
func (db *PackageDB) DeleteComplete(ctx SQLContext, packageID cdssdk.PackageID) error {
if err := db.Package().Delete(ctx, packageID); err != nil {
return fmt.Errorf("delete package state: %w", err)
}

if err := db.ObjectAccessStat().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("delete from object access stat: %w", err)
}

if err := db.ObjectBlock().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("delete from object block failed, err: %w", err)
}

if err := db.PinnedObject().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting pinned objects in package: %w", err)
}

if err := db.Object().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting objects in package: %w", err)
}

if err := db.PackageAccessStat().DeleteByPackageID(ctx, packageID); err != nil {
return fmt.Errorf("deleting package access stat: %w", err)
}

return nil
}

func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state string) error {
err := ctx.Exec("UPDATE Package SET State = ? WHERE PackageID = ?", state, packageID).Error
return err
}

func (*PackageDB) HasPackageIn(ctx SQLContext, bucketID cdssdk.BucketID) (bool, error) {
var pkg cdssdk.Package
err := ctx.Table("Package").Where("BucketID = ?", bucketID).First(&pkg).Error
if err == gorm.ErrRecordNotFound {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
}

func (*PackageDB) Move(ctx SQLContext, packageID cdssdk.PackageID, newBktID cdssdk.BucketID, newName string) error {
err := ctx.Table("Package").Where("PackageID = ?", packageID).Update("BucketID", newBktID).Update("Name", newName).Error
return err
}

+ 79
- 0
client/internal/db/package_access_stat.go View File

@@ -0,0 +1,79 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
coormq "gitlink.org.cn/cloudream/storage2/common/pkgs/mq/coordinator"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type PackageAccessStatDB struct {
*DB
}

func (db *DB) PackageAccessStat() *PackageAccessStatDB {
return &PackageAccessStatDB{db}
}

func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, stgID cdssdk.StorageID) (stgmod.PackageAccessStat, error) {
var ret stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID = ? AND StorageID = ?", pkgID, stgID).First(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
var ret []stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Find(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
if len(pkgIDs) == 0 {
return nil, nil
}

var ret []stgmod.PackageAccessStat
err := ctx.Table("PackageAccessStat").Where("PackageID IN (?)", pkgIDs).Find(&ret).Error
return ret, err
}

func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

accs := make([]stgmod.PackageAccessStat, len(entries))
for i, e := range entries {
accs[i] = stgmod.PackageAccessStat{
PackageID: e.PackageID,
StorageID: e.StorageID,
Counter: e.Counter,
}
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}},
DoUpdates: clause.Assignments(map[string]any{
"Counter": gorm.Expr("Counter + values(Counter)"),
}),
}).Table("PackageAccessStat").Create(&accs).Error
}

func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0 WHERE PackageID IN (?)"
return ctx.Exec(sql, historyWeight, historyWeight, pkgIDs).Error
}

func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
sql := "UPDATE PackageAccessStat SET Amount = Amount * ? + Counter * (1 - ?), Counter = 0"
return ctx.Exec(sql, historyWeight, historyWeight).Error
}

func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error {
return ctx.Table("PackageAccessStat").Where("PackageID = ?", pkgID).Delete(&stgmod.PackageAccessStat{}).Error
}

+ 122
- 0
client/internal/db/pinned_object.go View File

@@ -0,0 +1,122 @@
package db2

import (
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gorm.io/gorm/clause"
)

type PinnedObjectDB struct {
*DB
}

func (db *DB) PinnedObject() *PinnedObjectDB {
return &PinnedObjectDB{DB: db}
}

func (*PinnedObjectDB) GetByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.PinnedObject, error) {
var ret []cdssdk.PinnedObject
err := ctx.Table("PinnedObject").Find(&ret, "StorageID = ?", stgID).Error
return ret, err
}

func (*PinnedObjectDB) GetObjectsByStorageID(ctx SQLContext, stgID cdssdk.StorageID) ([]cdssdk.Object, error) {
var ret []cdssdk.Object
err := ctx.Table("Object").Joins("inner join PinnedObject on Object.ObjectID = PinnedObject.ObjectID").Where("StorageID = ?", stgID).Find(&ret).Error
return ret, err
}

func (*PinnedObjectDB) Create(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error {
return ctx.Table("PinnedObject").Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error
}

func (*PinnedObjectDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]cdssdk.PinnedObject, error) {
if len(objectIDs) == 0 {
return nil, nil
}

var pinneds []cdssdk.PinnedObject
err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Order("ObjectID asc").Find(&pinneds).Error
return pinneds, err
}

func (*PinnedObjectDB) TryCreate(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID, createTime time.Time) error {
return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}),
}).Create(&cdssdk.PinnedObject{StorageID: stgID, ObjectID: objectID, CreateTime: createTime}).Error
}

func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error {
if len(pinneds) == 0 {
return nil
}

return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "ObjectID"}, {Name: "StorageID"}},
DoUpdates: clause.AssignmentColumns([]string{"CreateTime"}),
}).Create(&pinneds).Error
}

func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
err := ctx.Exec(
"insert ignore into PinnedObject(StorageID, ObjectID, CreateTime) select ? as StorageID, ObjectID, ? as CreateTime from Object where PackageID = ?",
stgID,
time.Now(),
packageID,
).Error
return err
}

func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, stgIDs []cdssdk.StorageID) error {
if len(stgIDs) == 0 {
return nil
}

for _, id := range stgIDs {
err := db.TryCreate(ctx, id, objectID, time.Now())
if err != nil {
return err
}
}
return nil
}

func (*PinnedObjectDB) Delete(ctx SQLContext, stgID cdssdk.StorageID, objectID cdssdk.ObjectID) error {
err := ctx.Exec("delete from PinnedObject where StorageID = ? and ObjectID = ?", stgID, objectID).Error
return err
}

func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
err := ctx.Exec("delete from PinnedObject where ObjectID = ?", objectID).Error
return err
}

func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

err := ctx.Table("PinnedObject").Where("ObjectID in (?)", objectIDs).Delete(&cdssdk.PinnedObject{}).Error
return err
}

func (*PinnedObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
err := ctx.Table("PinnedObject").Where("ObjectID in (select ObjectID from Object where PackageID = ?)", packageID).Delete(&cdssdk.PinnedObject{}).Error
return err
}

func (*PinnedObjectDB) DeleteInPackageAtStorage(ctx SQLContext, packageID cdssdk.PackageID, stgID cdssdk.StorageID) error {
err := ctx.Exec("delete PinnedObject from PinnedObject inner join Object on PinnedObject.ObjectID = Object.ObjectID where PackageID = ? and StorageID = ?", packageID, stgID).Error
return err
}

func (*PinnedObjectDB) StorageBatchDelete(ctx SQLContext, stgID cdssdk.StorageID, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

err := ctx.Table("PinnedObject").Where("StorageID = ? and ObjectID in (?)", stgID, objectIDs).Delete(&cdssdk.PinnedObject{}).Error
return err
}

+ 120
- 0
client/internal/db/storage.go View File

@@ -0,0 +1,120 @@
package db2

import (
"fmt"

"gitlink.org.cn/cloudream/common/pkgs/logger"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage2/common/models"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
"gorm.io/gorm"
)

type StorageDB struct {
*DB
}

func (db *DB) Storage() *StorageDB {
return &StorageDB{DB: db}
}

func (db *StorageDB) GetByID(ctx SQLContext, stgID cdssdk.StorageID) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").First(&stg, stgID).Error
return stg, err
}

func (StorageDB) GetAllIDs(ctx SQLContext) ([]cdssdk.StorageID, error) {
var stgs []cdssdk.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&stgs).Error
return stgs, err
}

func (db *StorageDB) BatchGetByID(ctx SQLContext, stgIDs []cdssdk.StorageID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Find(&stgs, "StorageID IN (?)", stgIDs).Error
return stgs, err
}

func (db *StorageDB) GetUserStorages(ctx SQLContext, userID cdssdk.UserID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ?", userID).Find(&stgs).Error
return stgs, err
}

func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) {
var ret []cdssdk.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error
return ret, err
}

func (db *StorageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (bool, error) {
rows, err := ctx.Table("Storage").Select("Storage.StorageID").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Storage.StorageID = ?", userID, storageID).Rows()
if err != nil {
return false, fmt.Errorf("execute sql: %w", err)
}
defer rows.Close()

return rows.Next(), nil
}

func (db *StorageDB) GetUserStorage(ctx SQLContext, userID cdssdk.UserID, storageID cdssdk.StorageID) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Storage.StorageID = ?", userID, storageID).First(&stg).Error

return stg, err
}

func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID, name string) (model.Storage, error) {
var stg model.Storage
err := ctx.Table("Storage").Select("Storage.*").
Joins("inner join UserStorage on Storage.StorageID = UserStorage.StorageID").
Where("UserID = ? and Name = ?", userID, name).First(&stg).Error

return stg, err
}

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.HubID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error
return stgs, err
}

func (db *StorageDB) FillDetails(ctx SQLContext, details []stgmod.StorageDetail) error {
stgsMp := make(map[cdssdk.StorageID]*stgmod.StorageDetail)
var masterHubIDs []cdssdk.HubID
for i := range details {
stgsMp[details[i].Storage.StorageID] = &details[i]
masterHubIDs = append(masterHubIDs, details[i].Storage.MasterHub)
}

// 获取监护Hub信息
masterHubs, err := db.Hub().BatchGetByID(ctx, masterHubIDs)
if err != nil && err != gorm.ErrRecordNotFound {
return fmt.Errorf("getting master hub: %w", err)
}
masterHubMap := make(map[cdssdk.HubID]cdssdk.Hub)
for _, hub := range masterHubs {
masterHubMap[hub.HubID] = hub
}
for _, stg := range stgsMp {
if stg.Storage.MasterHub != 0 {
hub, ok := masterHubMap[stg.Storage.MasterHub]
if !ok {
logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage)
delete(stgsMp, stg.Storage.StorageID)
continue
}

stg.MasterHub = &hub
}
}

return nil
}

+ 44
- 0
client/internal/db/union_serializer.go View File

@@ -0,0 +1,44 @@
package db2

import (
"context"
"fmt"
"reflect"

"gitlink.org.cn/cloudream/common/utils/serder"
"gorm.io/gorm/schema"
)

type UnionSerializer struct {
}

func (UnionSerializer) Scan(ctx context.Context, field *schema.Field, dst reflect.Value, dbValue interface{}) error {
fieldValue := reflect.New(field.FieldType)
if dbValue != nil {
var data []byte
switch v := dbValue.(type) {
case []byte:
data = v
case string:
data = []byte(v)
default:
return fmt.Errorf("failed to unmarshal JSONB value: %#v", dbValue)
}

err := serder.JSONToObjectExRaw(data, fieldValue.Interface())
if err != nil {
return err
}
}

field.ReflectValueOf(ctx, dst).Set(fieldValue.Elem())
return nil
}

func (UnionSerializer) Value(ctx context.Context, field *schema.Field, dst reflect.Value, fieldValue interface{}) (interface{}, error) {
return serder.ObjectToJSONEx(fieldValue)
}

func init() {
schema.RegisterSerializer("union", UnionSerializer{})
}

+ 44
- 0
client/internal/db/user.go View File

@@ -0,0 +1,44 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gorm.io/gorm"
)

type UserDB struct {
*DB
}

func (db *DB) User() *UserDB {
return &UserDB{DB: db}
}

func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (cdssdk.User, error) {
var ret cdssdk.User
err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error
return ret, err
}

func (db *UserDB) GetByName(ctx SQLContext, name string) (cdssdk.User, error) {
var ret cdssdk.User
err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error
return ret, err
}

func (db *UserDB) Create(ctx SQLContext, name string) (cdssdk.User, error) {
_, err := db.GetByName(ctx, name)
if err == nil {
return cdssdk.User{}, gorm.ErrDuplicatedKey
}
if err != gorm.ErrRecordNotFound {
return cdssdk.User{}, err
}

user := cdssdk.User{Name: name}
err = ctx.Table("User").Create(&user).Error
return user, err
}

func (*UserDB) Delete(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("User").Delete(&cdssdk.User{UserID: userID}).Error
}

+ 36
- 0
client/internal/db/user_bucket.go View File

@@ -0,0 +1,36 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserBucketDB struct {
*DB
}

func (db *DB) UserBucket() *UserBucketDB {
return &UserBucketDB{DB: db}
}

func (*UserBucketDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserBucket, error) {
var userBuckets []model.UserBucket
err := ctx.Table("UserBucket").Where("UserID = ?", userID).Find(&userBuckets).Error
return userBuckets, err
}

func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error {
userBucket := model.UserBucket{
UserID: userID,
BucketID: bucketID,
}
return ctx.Table("UserBucket").Create(&userBucket).Error
}

func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error {
return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error
}

func (*UserBucketDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserBucket").Where("UserID = ?", userID).Delete(&model.UserBucket{}).Error
}

+ 34
- 0
client/internal/db/user_hub.go View File

@@ -0,0 +1,34 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserHubDB struct {
*DB
}

func (db *DB) UserHub() *UserHubDB {
return &UserHubDB{db}
}

func (*UserHubDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserHub, error) {
var userHubs []model.UserHub
if err := ctx.Table("UserHub").Where("UserID = ?", userID).Find(&userHubs).Error; err != nil {
return nil, err
}

return userHubs, nil
}

func (*UserHubDB) Create(ctx SQLContext, userID cdssdk.UserID, hubID cdssdk.HubID) error {
return ctx.Table("UserHub").Create(&model.UserHub{
UserID: userID,
HubID: hubID,
}).Error
}

func (*UserHubDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserHub").Delete(&model.UserHub{}, "UserID = ?", userID).Error
}

+ 34
- 0
client/internal/db/user_storage.go View File

@@ -0,0 +1,34 @@
package db2

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage2/common/pkgs/db2/model"
)

type UserStorageDB struct {
*DB
}

func (db *DB) UserStorage() *UserStorageDB {
return &UserStorageDB{db}
}

func (*UserStorageDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserStorage, error) {
var userStgs []model.UserStorage
if err := ctx.Table("UserStorage").Where("UserID = ?", userID).Find(&userStgs).Error; err != nil {
return nil, err
}

return userStgs, nil
}

func (*UserStorageDB) Create(ctx SQLContext, userID cdssdk.UserID, stgID cdssdk.StorageID) error {
return ctx.Table("UserStorage").Create(&model.UserStorage{
UserID: userID,
StorageID: stgID,
}).Error
}

func (*UserStorageDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error {
return ctx.Table("UserStorage").Delete(&model.UserStorage{}, "UserID = ?", userID).Error
}

+ 81
- 0
client/internal/db/utils.go View File

@@ -0,0 +1,81 @@
package db2

import (
"strings"

"gorm.io/gorm"
)

const (
maxPlaceholderCount = 65535
)

func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(result *gorm.DB) bool) error {
if argCnt == 0 {
result := ctx.Exec(sql, toInterfaceSlice(arr)...)
if result.Error != nil {
return result.Error
}

if callback != nil {
callback(result)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := min(batchSize, len(arr))

result := ctx.Exec(sql, toInterfaceSlice(arr[:curBatchSize])...)
if result.Error != nil {
return result.Error
}
if callback != nil && !callback(result) {
return nil
}

arr = arr[curBatchSize:]
}

return nil
}

// 将 []T 转换为 []interface{}
func toInterfaceSlice[T any](arr []T) []interface{} {
interfaceSlice := make([]interface{}, len(arr))
for i, v := range arr {
interfaceSlice[i] = v
}
return interfaceSlice
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

func escapeLike(left, right, word string) string {
var n int
for i := range word {
if c := word[i]; c == '%' || c == '_' || c == '\\' {
n++
}
}
// No characters to escape.
if n == 0 {
return left + word + right
}
var b strings.Builder
b.Grow(len(word) + n)
for _, c := range word {
if c == '%' || c == '_' || c == '\\' {
b.WriteByte('\\')
}
b.WriteRune(c)
}
return left + b.String() + right
}

+ 0
- 140
common/pkgs/db/bucket.go View File

@@ -1,140 +0,0 @@
package db

/*
import (
"database/sql"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type BucketDB struct {
*DB
}

func (db *DB) Bucket() *BucketDB {
return &BucketDB{DB: db}
}

func (db *BucketDB) GetByID(ctx SQLContext, bucketID cdssdk.BucketID) (cdssdk.Bucket, error) {
var ret cdssdk.Bucket
err := sqlx.Get(ctx, &ret, "select * from Bucket where BucketID = ?", bucketID)
return ret, err
}

// GetIDByName 根据BucketName查询BucketID
func (db *BucketDB) GetIDByName(bucketName string) (int64, error) {
//桶结构体
var result struct {
BucketID int64 `db:"BucketID"`
BucketName string `db:"BucketName"`
}

sql := "select BucketID, BucketName from Bucket where BucketName=? "
if err := db.d.Get(&result, sql, bucketName); err != nil {
return 0, err
}

return result.BucketID, nil
}

// IsAvailable 判断用户是否有指定Bucekt的权限
func (db *BucketDB) IsAvailable(ctx SQLContext, bucketID cdssdk.BucketID, userID cdssdk.UserID) (bool, error) {
_, err := db.GetUserBucket(ctx, userID, bucketID)
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find bucket failed, err: %w", err)
}

return true, nil
}

func (*BucketDB) GetUserBucket(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) (model.Bucket, error) {
var ret model.Bucket
err := sqlx.Get(ctx, &ret,
"select Bucket.* from UserBucket, Bucket where UserID = ? and"+
" UserBucket.BucketID = Bucket.BucketID and"+
" Bucket.BucketID = ?", userID, bucketID)
return ret, err
}

func (*BucketDB) GetUserBucketByName(ctx SQLContext, userID cdssdk.UserID, bucketName string) (model.Bucket, error) {
var ret model.Bucket
err := sqlx.Get(ctx, &ret,
"select Bucket.* from UserBucket, Bucket where UserID = ? and"+
" UserBucket.BucketID = Bucket.BucketID and"+
" Bucket.Name = ?", userID, bucketName)
return ret, err
}

func (*BucketDB) GetUserBuckets(ctx SQLContext, userID cdssdk.UserID) ([]model.Bucket, error) {
var ret []model.Bucket
err := sqlx.Select(ctx, &ret, "select Bucket.* from UserBucket, Bucket where UserID = ? and UserBucket.BucketID = Bucket.BucketID", userID)
return ret, err
}

func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName string) (cdssdk.BucketID, error) {
var bucketID int64
err := sqlx.Get(ctx, &bucketID, "select Bucket.BucketID from UserBucket, Bucket where UserBucket.UserID = ? and UserBucket.BucketID = Bucket.BucketID and Bucket.Name = ?", userID, bucketName)
if err == nil {
return 0, fmt.Errorf("bucket name exsits")
}

if err != sql.ErrNoRows {
return 0, err
}

ret, err := ctx.Exec("insert into Bucket(Name,CreatorID) values(?,?)", bucketName, userID)
if err != nil {
return 0, fmt.Errorf("insert bucket failed, err: %w", err)
}

bucketID, err = ret.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get inserted bucket id failed, err: %w", err)
}

_, err = ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID)
if err != nil {
return 0, fmt.Errorf("insert into user bucket failed, err: %w", err)
}

return cdssdk.BucketID(bucketID), err
}

func (db *BucketDB) Delete(ctx SQLContext, bucketID cdssdk.BucketID) error {
_, err := ctx.Exec("delete from UserBucket where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("delete user bucket failed, err: %w", err)
}

_, err = ctx.Exec("delete from Bucket where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("delete bucket failed, err: %w", err)
}

// 删除Bucket内的Package
var pkgIDs []cdssdk.PackageID
err = sqlx.Select(ctx, &pkgIDs, "select PackageID from Package where BucketID = ?", bucketID)
if err != nil {
return fmt.Errorf("query package failed, err: %w", err)
}

for _, pkgID := range pkgIDs {
err = db.Package().SoftDelete(ctx, pkgID)
if err != nil {
return fmt.Errorf("set package seleted failed, err: %w", err)
}

// 失败也没关系,会有定时任务再次尝试
db.Package().DeleteUnused(ctx, pkgID)
}
return nil
}
*/

+ 0
- 125
common/pkgs/db/cache.go View File

@@ -1,125 +0,0 @@
package db

/*
import (
"time"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type CacheDB struct {
*DB
}

func (db *DB) Cache() *CacheDB {
return &CacheDB{DB: db}
}

func (*CacheDB) Get(ctx SQLContext, fileHash string, hubID cdssdk.HubID) (model.Cache, error) {
var ret model.Cache
err := sqlx.Get(ctx, &ret, "select * from Cache where FileHash = ? and HubID = ?", fileHash, hubID)
return ret, err
}

func (*CacheDB) BatchGetAllFileHashes(ctx SQLContext, start int, count int) ([]string, error) {
var ret []string
err := sqlx.Select(ctx, &ret, "select distinct FileHash from Cache limit ?, ?", start, count)
return ret, err
}

func (*CacheDB) GetByHubID(ctx SQLContext, hubID cdssdk.HubID) ([]model.Cache, error) {
var ret []model.Cache
err := sqlx.Select(ctx, &ret, "select * from Cache where HubID = ?", hubID)
return ret, err
}

// Create 创建一条的缓存记录,如果已有则不进行操作
func (*CacheDB) Create(ctx SQLContext, fileHash string, hubID cdssdk.HubID, priority int) error {
_, err := ctx.Exec("insert ignore into Cache values(?,?,?,?)", fileHash, hubID, time.Now(), priority)
if err != nil {
return err
}

return nil
}

// 批量创建缓存记录
func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error {
if len(caches) == 0 {
return nil
}
return BatchNamedExec(
ctx,
"insert into Cache(FileHash,HubID,CreateTime,Priority) values(:FileHash,:HubID,:CreateTime,:Priority)"+
" on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)",
4,
caches,
nil,
)
}

func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, hubID cdssdk.HubID, priority int) error {
if len(fileHashes) == 0 {
return nil
}

var caches []model.Cache
var nowTime = time.Now()
for _, hash := range fileHashes {
caches = append(caches, model.Cache{
FileHash: hash,
HubID: hubID,
CreateTime: nowTime,
Priority: priority,
})
}

return BatchNamedExec(ctx,
"insert into Cache(FileHash,HubID,CreateTime,Priority) values(:FileHash,:HubID,:CreateTime,:Priority)"+
" on duplicate key update CreateTime=values(CreateTime), Priority=values(Priority)",
4,
caches,
nil,
)
}

func (*CacheDB) NodeBatchDelete(ctx SQLContext, hubID cdssdk.HubID, fileHashes []string) error {
if len(fileHashes) == 0 {
return nil
}

// TODO in语句有长度限制
query, args, err := sqlx.In("delete from Cache where HubID = ? and FileHash in (?)", hubID, fileHashes)
if err != nil {
return err
}
_, err = ctx.Exec(query, args...)
return err
}

// GetCachingFileNodes 查找缓存了指定文件的节点
func (*CacheDB) GetCachingFileNodes(ctx SQLContext, fileHash string) ([]cdssdk.Node, error) {
var x []cdssdk.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, Node where Cache.FileHash=? and Cache.HubID = Node.HubID", fileHash)
return x, err
}

// DeleteNodeAll 删除一个节点所有的记录
func (*CacheDB) DeleteNodeAll(ctx SQLContext, hubID cdssdk.HubID) error {
_, err := ctx.Exec("delete from Cache where HubID = ?", hubID)
return err
}

// FindCachingFileUserNodes 在缓存表中查询指定数据所在的节点
func (*CacheDB) FindCachingFileUserNodes(ctx SQLContext, userID cdssdk.HubID, fileHash string) ([]cdssdk.Node, error) {
var x []cdssdk.Node
err := sqlx.Select(ctx, &x,
"select Node.* from Cache, UserNode, Node where"+
" Cache.FileHash=? and Cache.HubID = UserNode.HubID and"+
" UserNode.UserID = ? and UserNode.HubID = Node.HubID", fileHash, userID)
return x, err
}
*/

+ 0
- 68
common/pkgs/db/db.go View File

@@ -1,68 +0,0 @@
package db

/*
import (
"context"
"database/sql"
"fmt"

_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/config"
)

type DB struct {
d *sqlx.DB
}

type SQLContext interface {
sqlx.Queryer
sqlx.Execer
sqlx.Ext
sqlx.Preparer

NamedQuery(query string, arg interface{}) (*sqlx.Rows, error)
NamedExec(query string, arg interface{}) (sql.Result, error)
PrepareNamed(query string) (*sqlx.NamedStmt, error)
}

func NewDB(cfg *config.Config) (*DB, error) {
db, err := sqlx.Open("mysql", cfg.MakeSourceString())
if err != nil {
return nil, fmt.Errorf("open database connection failed, err: %w", err)
}

// 尝试连接一下数据库,如果数据库配置有错误在这里就能报出来
err = db.Ping()
if err != nil {
return nil, err
}

return &DB{
d: db,
}, nil
}

func (db *DB) DoTx(isolation sql.IsolationLevel, fn func(tx *sqlx.Tx) error) error {
tx, err := db.d.BeginTxx(context.Background(), &sql.TxOptions{Isolation: isolation})
if err != nil {
return err
}

if err := fn(tx); err != nil {
tx.Rollback()
return err
}

if err := tx.Commit(); err != nil {
tx.Rollback()
return err
}

return nil
}

func (db *DB) SQLCtx() SQLContext {
return db.d
}
*/

+ 0
- 43
common/pkgs/db/node.go View File

@@ -1,43 +0,0 @@
package db

/*
import (
"time"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
)

type NodeDB struct {
*DB
}

func (db *DB) Node() *NodeDB {
return &NodeDB{DB: db}
}

func (db *NodeDB) GetByID(ctx SQLContext, hubID cdssdk.HubID) (cdssdk.Node, error) {
var ret cdssdk.Node
err := sqlx.Get(ctx, &ret, "select * from Node where HubID = ?", hubID)
return ret, err
}

func (db *NodeDB) GetAllNodes(ctx SQLContext) ([]cdssdk.Node, error) {
var ret []cdssdk.Node
err := sqlx.Select(ctx, &ret, "select * from Node")
return ret, err
}

// GetUserNodes 根据用户id查询可用node
func (db *NodeDB) GetUserNodes(ctx SQLContext, userID cdssdk.UserID) ([]cdssdk.Node, error) {
var nodes []cdssdk.Node
err := sqlx.Select(ctx, &nodes, "select Node.* from UserNode, Node where UserNode.HubID = Node.HubID and UserNode.UserID=?", userID)
return nodes, err
}

// UpdateState 更新状态,并且设置上次上报时间为现在
func (db *NodeDB) UpdateState(ctx SQLContext, hubID cdssdk.HubID, state string) error {
_, err := ctx.Exec("update Node set State = ?, LastReportTime = ? where HubID = ?", state, time.Now(), hubID)
return err
}
*/

+ 0
- 42
common/pkgs/db/node_connectivity.go View File

@@ -1,42 +0,0 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type NodeConnectivityDB struct {
*DB
}

func (db *DB) NodeConnectivity() *NodeConnectivityDB {
return &NodeConnectivityDB{DB: db}
}

func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, fromHubIDs []cdssdk.HubID) ([]model.NodeConnectivity, error) {
if len(fromHubIDs) == 0 {
return nil, nil
}

var ret []model.NodeConnectivity

sql, args, err := sqlx.In("select * from NodeConnectivity where FromHubID in (?)", fromHubIDs)
if err != nil {
return nil, err
}

return ret, sqlx.Select(ctx, &ret, sql, args...)
}

func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error {
if len(cons) == 0 {
return nil
}

return BatchNamedExec(ctx,
"insert into NodeConnectivity(FromHubID, ToHubID, Delay, TestTime) values(:FromHubID, :ToHubID, :Delay, :TestTime) as new"+
" on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil)
}
*/

+ 0
- 376
common/pkgs/db/object.go View File

@@ -1,376 +0,0 @@
package db

/*
import (
"fmt"
"strings"
"time"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/sort2"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type ObjectDB struct {
*DB
}

func (db *DB) Object() *ObjectDB {
return &ObjectDB{DB: db}
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
var ret model.TempObject
err := sqlx.Get(ctx, &ret, "select * from Object where ObjectID = ?", objectID)
return ret.ToObject(), err
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
if len(objectIDs) == 0 {
return make(map[cdssdk.ObjectID]bool), nil
}

stmt, args, err := sqlx.In("select ObjectID from Object where ObjectID in (?)", lo.Uniq(objectIDs))
if err != nil {
return nil, err
}

var avaiIDs []cdssdk.ObjectID
err = sqlx.Select(ctx, &avaiIDs, stmt, args...)
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.ObjectID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]model.Object, error) {
if len(objectIDs) == 0 {
return nil, nil
}

// TODO In语句
stmt, args, err := sqlx.In("select * from Object where ObjectID in (?) order by ObjectID asc", objectIDs)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

objs := make([]model.TempObject, 0, len(objectIDs))
err = sqlx.Select(ctx, &objs, stmt, args...)
if err != nil {
return nil, err
}

return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
}

func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
if len(pathes) == 0 {
return nil, nil
}

// TODO In语句
stmt, args, err := sqlx.In("select * from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

objs := make([]model.TempObject, 0, len(pathes))
err = sqlx.Select(ctx, &objs, stmt, args...)
if err != nil {
return nil, err
}

return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
}

func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime) values(?,?,?,?,?,?,?)"

ret, err := ctx.Exec(sql, obj.PackageID, obj.Path, obj.Size, obj.FileHash, obj.Redundancy, obj.UpdateTime, obj.UpdateTime)
if err != nil {
return 0, fmt.Errorf("insert object failed, err: %w", err)
}

objectID, err := ret.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get id of inserted object failed, err: %w", err)
}

return cdssdk.ObjectID(objectID), nil
}

// 可以用于批量创建或者更新记录。
// 用于创建时,需要额外检查PackageID+Path的唯一性。
// 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。
func (db *ObjectDB) BatchUpsertByPackagePath(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}

sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" +
" values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" +
" on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime"

return BatchNamedExec(ctx, sql, 7, objs, nil)
}

func (db *ObjectDB) BatchUpert(ctx SQLContext, objs []cdssdk.Object) error {
if len(objs) == 0 {
return nil
}

sql := "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" +
" values(:ObjectID, :PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" +
" on duplicate key update PackageID = new.PackageID, Path = new.Path, Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime"

return BatchNamedExec(ctx, sql, 8, objs, nil)
}

func (*ObjectDB) GetPackageObjects(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Object, error) {
var ret []model.TempObject
err := sqlx.Select(ctx, &ret, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
return lo.Map(ret, func(o model.TempObject, idx int) model.Object { return o.ToObject() }), err
}

func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.PackageID) ([]stgmod.ObjectDetail, error) {
var objs []model.TempObject
err := sqlx.Select(ctx, &objs, "select * from Object where PackageID = ? order by ObjectID asc", packageID)
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

var allBlocks []stgmod.ObjectBlock
err = sqlx.Select(ctx, &allBlocks, "select ObjectBlock.* from ObjectBlock, Object where PackageID = ? and ObjectBlock.ObjectID = Object.ObjectID order by ObjectBlock.ObjectID, `Index` asc", packageID)
if err != nil {
return nil, fmt.Errorf("getting all object blocks: %w", err)
}

var allPinnedObjs []cdssdk.PinnedObject
err = sqlx.Select(ctx, &allPinnedObjs, "select PinnedObject.* from PinnedObject, Object where PackageID = ? and PinnedObject.ObjectID = Object.ObjectID order by PinnedObject.ObjectID", packageID)
if err != nil {
return nil, fmt.Errorf("getting all pinned objects: %w", err)
}

details := make([]stgmod.ObjectDetail, len(objs))
for i, obj := range objs {
details[i] = stgmod.ObjectDetail{
Object: obj.ToObject(),
}
}

stgmod.DetailsFillObjectBlocks(details, allBlocks)
stgmod.DetailsFillPinnedAt(details, allPinnedObjs)
return details, nil
}

func (*ObjectDB) GetObjectsIfAnyBlockOnNode(ctx SQLContext, hubID cdssdk.HubID) ([]cdssdk.Object, error) {
var temps []model.TempObject
err := sqlx.Select(ctx, &temps, "select * from Object where ObjectID in (select ObjectID from ObjectBlock where HubID = ?) order by ObjectID asc", hubID)
if err != nil {
return nil, fmt.Errorf("getting objects: %w", err)
}

objs := make([]cdssdk.Object, len(temps))
for i := range temps {
objs[i] = temps[i].ToObject()
}

return objs, nil
}

func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.Object, error) {
if len(adds) == 0 {
return nil, nil
}

objs := make([]cdssdk.Object, 0, len(adds))
for _, add := range adds {
objs = append(objs, cdssdk.Object{
PackageID: packageID,
Path: add.Path,
Size: add.Size,
FileHash: add.FileHash,
Redundancy: cdssdk.NewNoneRedundancy(), // 首次上传默认使用不分块的none模式
CreateTime: add.UploadTime,
UpdateTime: add.UploadTime,
})
}

err := db.BatchUpsertByPackagePath(ctx, objs)
if err != nil {
return nil, fmt.Errorf("batch create or update objects: %w", err)
}

pathes := make([]string, 0, len(adds))
for _, add := range adds {
pathes = append(pathes, add.Path)
}
// 这里可以不用检查查询结果是否与pathes的数量相同
addedObjs, err := db.BatchGetByPackagePath(ctx, packageID, pathes)
if err != nil {
return nil, fmt.Errorf("batch get object ids: %w", err)
}

// 所有需要按索引来一一对应的数据都需要进行排序
adds = sort2.Sort(adds, func(l, r coormq.AddObjectEntry) int { return strings.Compare(l.Path, r.Path) })
addedObjs = sort2.Sort(addedObjs, func(l, r cdssdk.Object) int { return strings.Compare(l.Path, r.Path) })

addedObjIDs := make([]cdssdk.ObjectID, len(addedObjs))
for i := range addedObjs {
addedObjIDs[i] = addedObjs[i].ObjectID
}

err = db.ObjectBlock().BatchDeleteByObjectID(ctx, addedObjIDs)
if err != nil {
return nil, fmt.Errorf("batch delete object blocks: %w", err)
}

err = db.PinnedObject().BatchDeleteByObjectID(ctx, addedObjIDs)
if err != nil {
return nil, fmt.Errorf("batch delete pinned objects: %w", err)
}

objBlocks := make([]stgmod.ObjectBlock, 0, len(adds))
for i, add := range adds {
objBlocks = append(objBlocks, stgmod.ObjectBlock{
ObjectID: addedObjIDs[i],
Index: 0,
HubID: add.HubID,
FileHash: add.FileHash,
})
}
err = db.ObjectBlock().BatchCreate(ctx, objBlocks)
if err != nil {
return nil, fmt.Errorf("batch create object blocks: %w", err)
}

caches := make([]model.Cache, 0, len(adds))
for _, add := range adds {
caches = append(caches, model.Cache{
FileHash: add.FileHash,
HubID: add.HubID,
CreateTime: time.Now(),
Priority: 0,
})
}
err = db.Cache().BatchCreate(ctx, caches)
if err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
}

return addedObjs, nil
}

func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error {
if len(objs) == 0 {
return nil
}

nowTime := time.Now()
objIDs := make([]cdssdk.ObjectID, 0, len(objs))
dummyObjs := make([]cdssdk.Object, 0, len(objs))
for _, obj := range objs {
objIDs = append(objIDs, obj.ObjectID)
dummyObjs = append(dummyObjs, cdssdk.Object{
ObjectID: obj.ObjectID,
Redundancy: obj.Redundancy,
CreateTime: nowTime,
UpdateTime: nowTime,
})
}

// 目前只能使用这种方式来同时更新大量数据
err := BatchNamedExec(ctx,
"insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime)"+
" values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :CreateTime, :UpdateTime) as new"+
" on duplicate key update Redundancy=new.Redundancy", 8, dummyObjs, nil)
if err != nil {
return fmt.Errorf("batch update object redundancy: %w", err)
}

// 删除原本所有的编码块记录,重新添加
err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete object blocks: %w", err)
}

// 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况
err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs)
if err != nil {
return fmt.Errorf("batch delete pinned object: %w", err)
}

blocks := make([]stgmod.ObjectBlock, 0, len(objs))
for _, obj := range objs {
blocks = append(blocks, obj.Blocks...)
}
err = db.ObjectBlock().BatchCreate(ctx, blocks)
if err != nil {
return fmt.Errorf("batch create object blocks: %w", err)
}

caches := make([]model.Cache, 0, len(objs))
for _, obj := range objs {
for _, blk := range obj.Blocks {
caches = append(caches, model.Cache{
FileHash: blk.FileHash,
HubID: blk.HubID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
err = db.Cache().BatchCreate(ctx, caches)
if err != nil {
return fmt.Errorf("batch create object caches: %w", err)
}

pinneds := make([]cdssdk.PinnedObject, 0, len(objs))
for _, obj := range objs {
for _, p := range obj.PinnedAt {
pinneds = append(pinneds, cdssdk.PinnedObject{
ObjectID: obj.ObjectID,
StorageID: p,
CreateTime: time.Now(),
})
}
}
err = db.PinnedObject().BatchTryCreate(ctx, pinneds)
if err != nil {
return fmt.Errorf("batch create pinned objects: %w", err)
}

return nil
}

func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
if len(ids) == 0 {
return nil
}

query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids)
if err != nil {
return err
}

_, err = ctx.Exec(query, args...)
return err
}

func (*ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete from Object where PackageID = ?", packageID)
return err
}
*/

+ 0
- 123
common/pkgs/db/object_access_stat.go View File

@@ -1,123 +0,0 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type ObjectAccessStatDB struct {
*DB
}

func (db *DB) ObjectAccessStat() *ObjectAccessStatDB {
return &ObjectAccessStatDB{db}
}

func (*ObjectAccessStatDB) Get(ctx SQLContext, objID cdssdk.ObjectID, hubID cdssdk.HubID) (stgmod.ObjectAccessStat, error) {
var ret stgmod.ObjectAccessStat
err := sqlx.Get(ctx, &ret, "select * from ObjectAccessStat where ObjectID=? and HubID=?", objID, hubID)
return ret, err
}

func (*ObjectAccessStatDB) GetByObjectID(ctx SQLContext, objID cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
var ret []stgmod.ObjectAccessStat
err := sqlx.Select(ctx, &ret, "select * from ObjectAccessStat where ObjectID=?", objID)
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
stmt, args, err := sqlx.In("select * from ObjectAccessStat where ObjectID in (?)", objIDs)
if err != nil {
return ret, err
}

err = sqlx.Select(ctx, &ret, stmt, args...)
return ret, err
}

func (*ObjectAccessStatDB) BatchGetByObjectIDOnNode(ctx SQLContext, objIDs []cdssdk.ObjectID, hubID cdssdk.HubID) ([]stgmod.ObjectAccessStat, error) {
if len(objIDs) == 0 {
return nil, nil
}

var ret []stgmod.ObjectAccessStat
stmt, args, err := sqlx.In("select * from ObjectAccessStat where ObjectID in (?) and HubID=?", objIDs, hubID)
if err != nil {
return ret, err
}

err = sqlx.Select(ctx, &ret, stmt, args...)
return ret, err
}

func (*ObjectAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

sql := "insert into ObjectAccessStat(ObjectID, HubID, Counter, Amount) " +
" values(:ObjectID, :HubID, :Counter, 0) as new" +
" on duplicate key update ObjectAccessStat.Counter=ObjectAccessStat.Counter+new.Counter"
err := BatchNamedExec(ctx, sql, 4, entries, nil)
return err
}

func (*ObjectAccessStatDB) BatchUpdateAmountInPackage(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

stmt, args, err := sqlx.In("update ObjectAccessStat inner join Object"+
" on ObjectAccessStat.ObjectID = Object.ObjectID"+
" set Amount=Amount*?+Counter*(1-?), Counter = 0"+
" where PackageID in (?)", historyWeight, historyWeight, pkgIDs)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*ObjectAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
stmt, args, err := sqlx.In("update ObjectAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0", historyWeight, historyWeight)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*ObjectAccessStatDB) DeleteByObjectID(ctx SQLContext, objID cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from ObjectAccessStat where ObjectID=?", objID)
return err
}

func (*ObjectAccessStatDB) BatchDeleteByObjectID(ctx SQLContext, objIDs []cdssdk.ObjectID) error {
if len(objIDs) == 0 {
return nil
}

stmt, args, err := sqlx.In("delete from ObjectAccessStat where ObjectID in (?)", objIDs)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*ObjectAccessStatDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete ObjectAccessStat from ObjectAccessStat inner join Object on ObjectAccessStat.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err
}
*/

+ 0
- 138
common/pkgs/db/object_block.go View File

@@ -1,138 +0,0 @@
package db

/*
import (
"database/sql"
"strconv"
"strings"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

type ObjectBlockDB struct {
*DB
}

func (db *DB) ObjectBlock() *ObjectBlockDB {
return &ObjectBlockDB{DB: db}
}

func (db *ObjectBlockDB) GetByHubID(ctx SQLContext, hubID cdssdk.HubID) ([]stgmod.ObjectBlock, error) {
var rets []stgmod.ObjectBlock
err := sqlx.Select(ctx, &rets, "select * from ObjectBlock where HubID = ?", hubID)
return rets, err
}

func (db *ObjectBlockDB) BatchGetByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]stgmod.ObjectBlock, error) {
if len(objectIDs) == 0 {
return nil, nil
}

stmt, args, err := sqlx.In("select * from ObjectBlock where ObjectID in (?) order by ObjectID, `Index` asc", objectIDs)
if err != nil {
return nil, err
}
stmt = ctx.Rebind(stmt)

var blocks []stgmod.ObjectBlock
err = sqlx.Select(ctx, &blocks, stmt, args...)
if err != nil {
return nil, err
}

return blocks, nil
}

func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index int, hubID cdssdk.HubID, fileHash string) error {
_, err := ctx.Exec("insert into ObjectBlock values(?,?,?,?)", objectID, index, hubID, fileHash)
return err
}

func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error {
if len(blocks) == 0 {
return nil
}

return BatchNamedExec(ctx,
"insert ignore into ObjectBlock(ObjectID, `Index`, HubID, FileHash) values(:ObjectID, :Index, :HubID, :FileHash)",
4,
blocks,
nil,
)
}

func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID) error {
_, err := ctx.Exec("delete from ObjectBlock where ObjectID = ?", objectID)
return err
}

func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error {
if len(objectIDs) == 0 {
return nil
}

// TODO in语句有长度限制
query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs)
if err != nil {
return err
}
_, err = ctx.Exec(query, args...)
return err
}

func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete ObjectBlock from ObjectBlock inner join Object on ObjectBlock.ObjectID = Object.ObjectID where PackageID = ?", packageID)
return err
}

func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, hubID cdssdk.HubID, fileHashes []string) error {
if len(fileHashes) == 0 {
return nil
}

query, args, err := sqlx.In("delete from ObjectBlock where HubID = ? and FileHash in (?)", hubID, fileHashes)
if err != nil {
return err
}

_, err = ctx.Exec(query, args...)
return err
}

func (db *ObjectBlockDB) CountBlockWithHash(ctx SQLContext, fileHash string) (int, error) {
var cnt int
err := sqlx.Get(ctx, &cnt,
"select count(FileHash) from ObjectBlock, Object, Package where FileHash = ? and"+
" ObjectBlock.ObjectID = Object.ObjectID and"+
" Object.PackageID = Package.PackageID and"+
" Package.State = ?", fileHash, cdssdk.PackageStateNormal)
if err == sql.ErrNoRows {
return 0, nil
}

return cnt, err
}

// 按逗号切割字符串,并将每一个部分解析为一个int64的ID。
// 注:需要外部保证分隔的每一个部分都是正确的10进制数字格式
func splitConcatedHubID(idStr string) []cdssdk.HubID {
idStrs := strings.Split(idStr, ",")
ids := make([]cdssdk.HubID, 0, len(idStrs))

for _, str := range idStrs {
// 假设传入的ID是正确的数字格式
id, _ := strconv.ParseInt(str, 10, 64)
ids = append(ids, cdssdk.HubID(id))
}

return ids
}

// 按逗号切割字符串
func splitConcatedFileHash(idStr string) []string {
idStrs := strings.Split(idStr, ",")
return idStrs
}
*/

+ 0
- 207
common/pkgs/db/package.go View File

@@ -1,207 +0,0 @@
package db

/*
import (
"database/sql"
"errors"
"fmt"

"github.com/jmoiron/sqlx"
"github.com/samber/lo"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type PackageDB struct {
*DB
}

func (db *DB) Package() *PackageDB {
return &PackageDB{DB: db}
}

func (db *PackageDB) GetByID(ctx SQLContext, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := sqlx.Get(ctx, &ret, "select * from Package where PackageID = ?", packageID)
return ret, err
}

func (db *PackageDB) GetByName(ctx SQLContext, bucketID cdssdk.BucketID, name string) (model.Package, error) {
var ret model.Package
err := sqlx.Get(ctx, &ret, "select * from Package where BucketID = ? and Name = ?", bucketID, name)
return ret, err
}

func (db *PackageDB) BatchTestPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) (map[cdssdk.PackageID]bool, error) {
if len(pkgIDs) == 0 {
return make(map[cdssdk.PackageID]bool), nil
}

stmt, args, err := sqlx.In("select PackageID from Package where PackageID in (?)", lo.Uniq(pkgIDs))
if err != nil {
return nil, err
}

var avaiIDs []cdssdk.PackageID
err = sqlx.Select(ctx, &avaiIDs, stmt, args...)
if err != nil {
return nil, err
}

avaiIDMap := make(map[cdssdk.PackageID]bool)
for _, pkgID := range avaiIDs {
avaiIDMap[pkgID] = true
}

return avaiIDMap, nil
}

func (*PackageDB) BatchGetAllPackageIDs(ctx SQLContext, start int, count int) ([]cdssdk.PackageID, error) {
var ret []cdssdk.PackageID
err := sqlx.Select(ctx, &ret, "select PackageID from Package limit ?, ?", start, count)
return ret, err
}

func (db *PackageDB) GetBucketPackages(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) ([]model.Package, error) {
var ret []model.Package
err := sqlx.Select(ctx, &ret, "select Package.* from UserBucket, Package where UserID = ? and UserBucket.BucketID = ? and UserBucket.BucketID = Package.BucketID", userID, bucketID)
return ret, err
}

// IsAvailable 判断一个用户是否拥有指定对象
func (db *PackageDB) IsAvailable(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (bool, error) {
var pkgID cdssdk.PackageID
// 先根据PackageID找到Package,然后判断此Package所在的Bucket是不是归此用户所有
err := sqlx.Get(ctx, &pkgID,
"select Package.PackageID from Package, UserBucket where "+
"Package.PackageID = ? and "+
"Package.BucketID = UserBucket.BucketID and "+
"UserBucket.UserID = ?",
packageID, userID)

if err == sql.ErrNoRows {
return false, nil
}

if err != nil {
return false, fmt.Errorf("find package failed, err: %w", err)
}

return true, nil
}

// GetUserPackage 获得Package,如果用户没有权限访问,则不会获得结果
func (db *PackageDB) GetUserPackage(ctx SQLContext, userID cdssdk.UserID, packageID cdssdk.PackageID) (model.Package, error) {
var ret model.Package
err := sqlx.Get(ctx, &ret,
"select Package.* from Package, UserBucket where"+
" Package.PackageID = ? and"+
" Package.BucketID = UserBucket.BucketID and"+
" UserBucket.UserID = ?",
packageID, userID)
return ret, err
}

// 在指定名称的Bucket中查找指定名称的Package
func (*PackageDB) GetUserPackageByName(ctx SQLContext, userID cdssdk.UserID, bucketName string, packageName string) (cdssdk.Package, error) {
var ret model.Package
err := sqlx.Get(ctx, &ret,
"select Package.* from Package, Bucket, UserBucket where"+
" Package.Name = ? and"+
" Package.BucketID = Bucket.BucketID and"+
" Bucket.Name = ? and"+
" UserBucket.UserID = ? and"+
" UserBucket.BucketID = Bucket.BucketID",
packageName, bucketName, userID)
return ret, err
}

func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name string) (cdssdk.PackageID, error) {
// 根据packagename和bucketid查询,若不存在则插入,若存在则返回错误
var packageID int64
err := sqlx.Get(ctx, &packageID, "select PackageID from Package where Name = ? AND BucketID = ? for update", name, bucketID)
// 无错误代表存在记录
if err == nil {
return 0, fmt.Errorf("package with given Name and BucketID already exists")
}
// 错误不是记录不存在
if !errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("query Package by PackageName and BucketID failed, err: %w", err)
}

sql := "insert into Package(Name, BucketID, State) values(?,?,?)"
r, err := ctx.Exec(sql, name, bucketID, cdssdk.PackageStateNormal)
if err != nil {
return 0, fmt.Errorf("insert package failed, err: %w", err)
}

packageID, err = r.LastInsertId()
if err != nil {
return 0, fmt.Errorf("get id of inserted package failed, err: %w", err)
}

return cdssdk.PackageID(packageID), nil
}

// SoftDelete 设置一个对象被删除,并将相关数据删除
func (db *PackageDB) SoftDelete(ctx SQLContext, packageID cdssdk.PackageID) error {
obj, err := db.GetByID(ctx, packageID)
if err != nil {
return fmt.Errorf("get package failed, err: %w", err)
}

// 不是正常状态的Package,则不删除
// TODO 未来可能有其他状态
if obj.State != cdssdk.PackageStateNormal {
return nil
}

err = db.ChangeState(ctx, packageID, cdssdk.PackageStateDeleted)
if err != nil {
return fmt.Errorf("change package state failed, err: %w", err)
}

err = db.ObjectAccessStat().DeleteInPackage(ctx, packageID)
if err != nil {
return fmt.Errorf("delete from object access stat: %w", err)
}

err = db.ObjectBlock().DeleteInPackage(ctx, packageID)
if err != nil {
return fmt.Errorf("delete from object rep failed, err: %w", err)
}

if err := db.PinnedObject().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting pinned objects in package: %w", err)
}

if err := db.Object().DeleteInPackage(ctx, packageID); err != nil {
return fmt.Errorf("deleting objects in package: %w", err)
}

_, err = db.StoragePackage().SetAllPackageDeleted(ctx, packageID)
if err != nil {
return fmt.Errorf("set storage package deleted failed, err: %w", err)
}

return nil
}

// DeleteUnused 删除一个已经是Deleted状态,且不再被使用的对象。目前可能被使用的地方只有StoragePackage
func (PackageDB) DeleteUnused(ctx SQLContext, packageID cdssdk.PackageID) error {
_, err := ctx.Exec("delete from Package where PackageID = ? and State = ? and "+
"not exists(select StorageID from StoragePackage where PackageID = ?)",
packageID,
cdssdk.PackageStateDeleted,
packageID,
)

return err
}

func (*PackageDB) ChangeState(ctx SQLContext, packageID cdssdk.PackageID, state string) error {
_, err := ctx.Exec("update Package set State = ? where PackageID = ?", state, packageID)
return err
}
*/

+ 0
- 86
common/pkgs/db/package_access_stat.go View File

@@ -1,86 +0,0 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
)

type PackageAccessStatDB struct {
*DB
}

func (db *DB) PackageAccessStat() *PackageAccessStatDB {
return &PackageAccessStatDB{db}
}

func (*PackageAccessStatDB) Get(ctx SQLContext, pkgID cdssdk.PackageID, hubID cdssdk.HubID) (stgmod.PackageAccessStat, error) {
var ret stgmod.PackageAccessStat
err := sqlx.Get(ctx, &ret, "select * from PackageAccessStat where PackageID=? and HubID=?", pkgID, hubID)
return ret, err
}

func (*PackageAccessStatDB) GetByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
var ret []stgmod.PackageAccessStat
err := sqlx.Select(ctx, &ret, "select * from PackageAccessStat where PackageID=?", pkgID)
return ret, err
}

func (*PackageAccessStatDB) BatchGetByPackageID(ctx SQLContext, pkgIDs []cdssdk.PackageID) ([]stgmod.PackageAccessStat, error) {
if len(pkgIDs) == 0 {
return nil, nil
}

var ret []stgmod.PackageAccessStat
stmt, args, err := sqlx.In("select * from PackageAccessStat where PackageID in (?)", pkgIDs)
if err != nil {
return nil, err
}

err = sqlx.Select(ctx, &ret, stmt, args...)
return ret, err
}

func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.AddAccessStatEntry) error {
if len(entries) == 0 {
return nil
}

sql := "insert into PackageAccessStat(PackageID, HubID, Counter, Amount)" +
" values(:PackageID, :HubID, :Counter, 0) as new" +
" on duplicate key update Counter=Counter+new.Counter"
err := BatchNamedExec(ctx, sql, 4, entries, nil)
return err
}

func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {
if len(pkgIDs) == 0 {
return nil
}

stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0 where PackageID in (?)", historyWeight, historyWeight, pkgIDs)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*PackageAccessStatDB) UpdateAllAmount(ctx SQLContext, historyWeight float64) error {
stmt, args, err := sqlx.In("update PackageAccessStat set Amount=Amount*?+Counter*(1-?), Counter = 0", historyWeight, historyWeight)
if err != nil {
return err
}

_, err = ctx.Exec(stmt, args...)
return err
}

func (*PackageAccessStatDB) DeleteByPackageID(ctx SQLContext, pkgID cdssdk.PackageID) error {
_, err := ctx.Exec("delete from PackageAccessStat where PackageID=?", pkgID)
return err
}
*/

+ 0
- 119
common/pkgs/db/storage_package.go View File

@@ -1,119 +0,0 @@
package db

/*
import (
"fmt"

"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type StoragePackageDB struct {
*DB
}

func (db *DB) StoragePackage() *StoragePackageDB {
return &StoragePackageDB{DB: db}
}

func (*StoragePackageDB) Get(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) (model.StoragePackage, error) {
var ret model.StoragePackage
err := sqlx.Get(ctx, &ret, "select * from StoragePackage where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return ret, err
}

func (*StoragePackageDB) GetAllByStorageAndPackageID(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID) ([]model.StoragePackage, error) {
var ret []model.StoragePackage
err := sqlx.Select(ctx, &ret, "select * from StoragePackage where StorageID = ? and PackageID = ?", storageID, packageID)
return ret, err
}

func (*StoragePackageDB) GetAllByStorageID(ctx SQLContext, storageID cdssdk.StorageID) ([]model.StoragePackage, error) {
var ret []model.StoragePackage
err := sqlx.Select(ctx, &ret, "select * from StoragePackage where StorageID = ?", storageID)
return ret, err
}

func (*StoragePackageDB) CreateOrUpdate(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("insert into StoragePackage values(?,?,?,?)"+
" on duplicate key update State=values(State)", storageID, packageID, userID, model.StoragePackageStateNormal)
return err
}

func (*StoragePackageDB) ChangeState(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID, state string) error {
_, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ?", state, storageID, packageID, userID)
return err
}

// SetStateNormal 将状态设置为Normal,如果记录状态是Deleted,则不进行操作
func (*StoragePackageDB) SetStateNormal(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("update StoragePackage set State = ? where StorageID = ? and PackageID = ? and UserID = ? and State <> ?",
model.StoragePackageStateNormal,
storageID,
packageID,
userID,
model.StoragePackageStateDeleted,
)
return err
}

func (*StoragePackageDB) SetAllPackageState(ctx SQLContext, packageID cdssdk.PackageID, state string) (int64, error) {
ret, err := ctx.Exec(
"update StoragePackage set State = ? where PackageID = ?",
state,
packageID,
)
if err != nil {
return 0, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return 0, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt, nil
}

// SetAllPackageOutdated 将Storage中指定对象设置为已过期。
// 注:只会设置Normal状态的对象
func (*StoragePackageDB) SetAllPackageOutdated(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) {
ret, err := ctx.Exec(
"update StoragePackage set State = ? where State = ? and PackageID = ?",
model.StoragePackageStateOutdated,
model.StoragePackageStateNormal,
packageID,
)
if err != nil {
return 0, err
}

cnt, err := ret.RowsAffected()
if err != nil {
return 0, fmt.Errorf("get affected rows failed, err: %w", err)
}

return cnt, nil
}

func (db *StoragePackageDB) SetAllPackageDeleted(ctx SQLContext, packageID cdssdk.PackageID) (int64, error) {
return db.SetAllPackageState(ctx, packageID, model.StoragePackageStateDeleted)
}

func (*StoragePackageDB) Delete(ctx SQLContext, storageID cdssdk.StorageID, packageID cdssdk.PackageID, userID cdssdk.UserID) error {
_, err := ctx.Exec("delete from StoragePackage where StorageID = ? and PackageID = ? and UserID = ?", storageID, packageID, userID)
return err
}

// FindPackageStorages 查询存储了指定对象的Storage
func (*StoragePackageDB) FindPackageStorages(ctx SQLContext, packageID cdssdk.PackageID) ([]model.Storage, error) {
var ret []model.Storage
err := sqlx.Select(ctx, &ret,
"select Storage.* from StoragePackage, Storage where PackageID = ? and"+
" StoragePackage.StorageID = Storage.StorageID",
packageID,
)
return ret, err
}
*/

+ 0
- 23
common/pkgs/db/user.go View File

@@ -1,23 +0,0 @@
package db

/*
import (
"github.com/jmoiron/sqlx"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
)

type UserDB struct {
*DB
}

func (db *DB) User() *UserDB {
return &UserDB{DB: db}
}

func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (model.User, error) {
var ret model.User
err := sqlx.Get(ctx, &ret, "select * from User where UserID = ?", userID)
return ret, err
}
*/

+ 0
- 16
common/pkgs/db/user_bucket.go View File

@@ -1,16 +0,0 @@
package db

/*
type UserBucketDB struct {
*DB
}

func (db *DB) UserBucket() *UserBucketDB {
return &UserBucketDB{DB: db}
}

func (*UserBucketDB) Create(ctx SQLContext, userID int64, bucketID int64) error {
_, err := ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID)
return err
}
*/

+ 0
- 77
common/pkgs/db/utils.go View File

@@ -1,77 +0,0 @@
package db

/*
import (
"database/sql"

"github.com/jmoiron/sqlx"
"gitlink.org.cn/cloudream/common/utils/math2"
)

const (
maxPlaceholderCount = 65535
)

func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(sql.Result) bool) error {
if argCnt == 0 {
ret, err := ctx.NamedExec(sql, arr)
if err != nil {
return err
}

if callback != nil {
callback(ret)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math2.Min(batchSize, len(arr))

ret, err := ctx.NamedExec(sql, arr[:curBatchSize])
if err != nil {
return err
}
if callback != nil && !callback(ret) {
return nil
}

arr = arr[curBatchSize:]
}

return nil
}

func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, callback func(*sqlx.Rows) bool) error {
if argCnt == 0 {
ret, err := ctx.NamedQuery(sql, arr)
if err != nil {
return err
}

if callback != nil {
callback(ret)
}

return nil
}

batchSize := maxPlaceholderCount / argCnt
for len(arr) > 0 {
curBatchSize := math2.Min(batchSize, len(arr))

ret, err := ctx.NamedQuery(sql, arr[:curBatchSize])
if err != nil {
return err
}
if callback != nil && !callback(ret) {
return nil
}

arr = arr[curBatchSize:]
}
return nil
}
*/

Loading…
Cancel
Save