| @@ -46,6 +46,9 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr | |||
| // 批量创建缓存记录 | |||
| func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { | |||
| if len(caches) == 0 { | |||
| return nil | |||
| } | |||
| return BatchNamedExec( | |||
| ctx, | |||
| "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ | |||
| @@ -57,6 +60,10 @@ func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { | |||
| } | |||
| func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { | |||
| if len(fileHashes) == 0 { | |||
| return nil | |||
| } | |||
| var caches []model.Cache | |||
| var nowTime = time.Now() | |||
| for _, hash := range fileHashes { | |||
| @@ -78,6 +85,10 @@ func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeI | |||
| } | |||
| func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | |||
| if len(fileHashes) == 0 { | |||
| return nil | |||
| } | |||
| // TODO in语句有长度限制 | |||
| query, args, err := sqlx.In("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| if err != nil { | |||
| @@ -15,6 +15,10 @@ func (db *DB) NodeConnectivity() *NodeConnectivityDB { | |||
| } | |||
| func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { | |||
| if len(nodeIDs) == 0 { | |||
| return nil, nil | |||
| } | |||
| var ret []model.NodeConnectivity | |||
| sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) | |||
| @@ -26,6 +30,10 @@ func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssd | |||
| } | |||
| func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error { | |||
| if len(cons) == 0 { | |||
| return nil | |||
| } | |||
| return BatchNamedExec(ctx, | |||
| "insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+ | |||
| " on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil) | |||
| @@ -27,6 +27,10 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj | |||
| } | |||
| func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { | |||
| if len(pathes) == 0 { | |||
| return nil, nil | |||
| } | |||
| // TODO In语句 | |||
| stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) | |||
| if err != nil { | |||
| @@ -63,6 +67,10 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, | |||
| // 用于创建时,需要额外检查PackageID+Path的唯一性 | |||
| // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 | |||
| func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { | |||
| if len(objs) == 0 { | |||
| return nil | |||
| } | |||
| sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + | |||
| " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + | |||
| " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" | |||
| @@ -129,6 +137,10 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac | |||
| } | |||
| func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { | |||
| if len(adds) == 0 { | |||
| return nil, nil | |||
| } | |||
| objs := make([]cdssdk.Object, 0, len(adds)) | |||
| for _, add := range adds { | |||
| objs = append(objs, cdssdk.Object{ | |||
| @@ -175,7 +187,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] | |||
| FileHash: add.FileHash, | |||
| }) | |||
| } | |||
| err = db.ObjectBlock().BatchCreate(ctx, objBlocks) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("batch create object blocks: %w", err) | |||
| @@ -190,7 +201,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] | |||
| Priority: 0, | |||
| }) | |||
| } | |||
| err = db.Cache().BatchCreate(ctx, caches) | |||
| if err != nil { | |||
| return nil, fmt.Errorf("batch create caches: %w", err) | |||
| @@ -200,6 +210,11 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] | |||
| } | |||
| func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { | |||
| if len(objs) == 0 { | |||
| return nil | |||
| } | |||
| nowTime := time.Now() | |||
| objIDs := make([]cdssdk.ObjectID, 0, len(objs)) | |||
| dummyObjs := make([]cdssdk.Object, 0, len(objs)) | |||
| for _, obj := range objs { | |||
| @@ -207,14 +222,16 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb | |||
| dummyObjs = append(dummyObjs, cdssdk.Object{ | |||
| ObjectID: obj.ObjectID, | |||
| Redundancy: obj.Redundancy, | |||
| CreateTime: nowTime, | |||
| UpdateTime: nowTime, | |||
| }) | |||
| } | |||
| // 目前只能使用这种方式来同时更新大量数据 | |||
| err := BatchNamedExec(ctx, | |||
| "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ | |||
| " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ | |||
| " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) | |||
| "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime)"+ | |||
| " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :CreateTime, :UpdateTime) as new"+ | |||
| " on duplicate key update Redundancy=new.Redundancy", 8, dummyObjs, nil) | |||
| if err != nil { | |||
| return fmt.Errorf("batch update object redundancy: %w", err) | |||
| } | |||
| @@ -275,6 +292,10 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb | |||
| } | |||
| func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { | |||
| if len(ids) == 0 { | |||
| return nil | |||
| } | |||
| query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids) | |||
| if err != nil { | |||
| return err | |||
| @@ -30,6 +30,10 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index | |||
| } | |||
| func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { | |||
| if len(blocks) == 0 { | |||
| return nil | |||
| } | |||
| return BatchNamedExec(ctx, | |||
| "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", | |||
| 4, | |||
| @@ -44,6 +48,10 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object | |||
| } | |||
| func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { | |||
| if len(objectIDs) == 0 { | |||
| return nil | |||
| } | |||
| // TODO in语句有长度限制 | |||
| query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs) | |||
| if err != nil { | |||
| @@ -59,6 +67,10 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Packag | |||
| } | |||
| func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { | |||
| if len(fileHashes) == 0 { | |||
| return nil | |||
| } | |||
| query, args, err := sqlx.In("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) | |||
| if err != nil { | |||
| return err | |||
| @@ -40,6 +40,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID | |||
| } | |||
| func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { | |||
| if len(pinneds) == 0 { | |||
| return nil | |||
| } | |||
| return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil) | |||
| } | |||
| @@ -54,6 +58,10 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag | |||
| } | |||
| func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { | |||
| if len(nodeIDs) == 0 { | |||
| return nil | |||
| } | |||
| for _, id := range nodeIDs { | |||
| err := db.TryCreate(ctx, id, objectID, time.Now()) | |||
| if err != nil { | |||
| @@ -74,6 +82,10 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID | |||
| } | |||
| func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { | |||
| if len(objectIDs) == 0 { | |||
| return nil | |||
| } | |||
| // TODO in语句有长度限制 | |||
| query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) | |||
| if err != nil { | |||
| @@ -94,7 +106,11 @@ func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.Pa | |||
| } | |||
| func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { | |||
| query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) | |||
| if len(objectIDs) == 0 { | |||
| return nil | |||
| } | |||
| query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", nodeID, objectIDs) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| @@ -31,7 +31,7 @@ func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, call | |||
| ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) | |||
| if err != nil { | |||
| return nil | |||
| return err | |||
| } | |||
| if callback != nil && !callback(ret) { | |||
| return nil | |||
| @@ -63,7 +63,7 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal | |||
| ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) | |||
| if err != nil { | |||
| return nil | |||
| return err | |||
| } | |||
| if callback != nil && !callback(ret) { | |||
| return nil | |||