diff --git a/common/pkgs/db/cache.go b/common/pkgs/db/cache.go index 84ff1ca..4979228 100644 --- a/common/pkgs/db/cache.go +++ b/common/pkgs/db/cache.go @@ -46,6 +46,9 @@ func (*CacheDB) Create(ctx SQLContext, fileHash string, nodeID cdssdk.NodeID, pr // 批量创建缓存记录 func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { + if len(caches) == 0 { + return nil + } return BatchNamedExec( ctx, "insert into Cache(FileHash,NodeID,CreateTime,Priority) values(:FileHash,:NodeID,:CreateTime,:Priority)"+ @@ -57,6 +60,10 @@ func (*CacheDB) BatchCreate(ctx SQLContext, caches []model.Cache) error { } func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeID cdssdk.NodeID, priority int) error { + if len(fileHashes) == 0 { + return nil + } + var caches []model.Cache var nowTime = time.Now() for _, hash := range fileHashes { @@ -78,6 +85,10 @@ func (*CacheDB) BatchCreateOnSameNode(ctx SQLContext, fileHashes []string, nodeI } func (*CacheDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { + if len(fileHashes) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from Cache where NodeID = ? and FileHash in (?)", nodeID, fileHashes) if err != nil { diff --git a/common/pkgs/db/node_connectivity.go b/common/pkgs/db/node_connectivity.go index ffc3307..8461563 100644 --- a/common/pkgs/db/node_connectivity.go +++ b/common/pkgs/db/node_connectivity.go @@ -15,6 +15,10 @@ func (db *DB) NodeConnectivity() *NodeConnectivityDB { } func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssdk.NodeID) ([]model.NodeConnectivity, error) { + if len(nodeIDs) == 0 { + return nil, nil + } + var ret []model.NodeConnectivity sql, args, err := sqlx.In("select * from NodeConnectivity where NodeID in (?)", nodeIDs) @@ -26,6 +30,10 @@ func (db *NodeConnectivityDB) BatchGetByFromNode(ctx SQLContext, nodeIDs []cdssd } func (db *NodeConnectivityDB) BatchUpdateOrCreate(ctx SQLContext, cons []model.NodeConnectivity) error { + if len(cons) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert into NodeConnectivity(FromNodeID, ToNodeID, Delay, TestTime) values(:FromNodeID, :ToNodeID, :Delay, :TestTime) as new"+ " on duplicate key update Delay = new.Delay, TestTime = new.TestTime", 4, cons, nil) diff --git a/common/pkgs/db/object.go b/common/pkgs/db/object.go index 20c1afb..3655c98 100644 --- a/common/pkgs/db/object.go +++ b/common/pkgs/db/object.go @@ -27,6 +27,10 @@ func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Obj } func (db *ObjectDB) BatchGetPackageObjectIDs(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.ObjectID, error) { + if len(pathes) == 0 { + return nil, nil + } + // TODO In语句 stmt, args, err := sqlx.In("select ObjectID from Object force index(PackagePath) where PackageID=? and Path in (?)", pkgID, pathes) if err != nil { @@ -63,6 +67,10 @@ func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, // 用于创建时,需要额外检查PackageID+Path的唯一性 // 用于更新时,需要额外检查现存的PackageID+Path对应的ObjectID是否与待更新的ObjectID相同。不会更新CreateTime。 func (db *ObjectDB) BatchCreateOrUpdate(ctx SQLContext, objs []cdssdk.Object) error { + if len(objs) == 0 { + return nil + } + sql := "insert into Object(PackageID, Path, Size, FileHash, Redundancy, CreateTime ,UpdateTime)" + " values(:PackageID,:Path,:Size,:FileHash,:Redundancy, :CreateTime, :UpdateTime) as new" + " on duplicate key update Size = new.Size, FileHash = new.FileHash, Redundancy = new.Redundancy, UpdateTime = new.UpdateTime" @@ -129,6 +137,10 @@ func (db *ObjectDB) GetPackageObjectDetails(ctx SQLContext, packageID cdssdk.Pac } func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []coormq.AddObjectEntry) ([]cdssdk.ObjectID, error) { + if len(adds) == 0 { + return nil, nil + } + objs := make([]cdssdk.Object, 0, len(adds)) for _, add := range adds { objs = append(objs, cdssdk.Object{ @@ -175,7 +187,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] FileHash: add.FileHash, }) } - err = db.ObjectBlock().BatchCreate(ctx, objBlocks) if err != nil { return nil, fmt.Errorf("batch create object blocks: %w", err) @@ -190,7 +201,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] Priority: 0, }) } - err = db.Cache().BatchCreate(ctx, caches) if err != nil { return nil, fmt.Errorf("batch create caches: %w", err) @@ -200,6 +210,11 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] } func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeObjectRedundancyEntry) error { + if len(objs) == 0 { + return nil + } + + nowTime := time.Now() objIDs := make([]cdssdk.ObjectID, 0, len(objs)) dummyObjs := make([]cdssdk.Object, 0, len(objs)) for _, obj := range objs { @@ -207,14 +222,16 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb dummyObjs = append(dummyObjs, cdssdk.Object{ ObjectID: obj.ObjectID, Redundancy: obj.Redundancy, + CreateTime: nowTime, + UpdateTime: nowTime, }) } // 目前只能使用这种方式来同时更新大量数据 err := BatchNamedExec(ctx, - "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, UpdateTime)"+ - " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :UpdateTime) as new"+ - " on duplicate key update Redundancy=new.Redundancy", 7, dummyObjs, nil) + "insert into Object(ObjectID, PackageID, Path, Size, FileHash, Redundancy, CreateTime, UpdateTime)"+ + " values(:ObjectID, :PackageID, :Path, :Size, :FileHash, :Redundancy, :CreateTime, :UpdateTime) as new"+ + " on duplicate key update Redundancy=new.Redundancy", 8, dummyObjs, nil) if err != nil { return fmt.Errorf("batch update object redundancy: %w", err) } @@ -275,6 +292,10 @@ func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.ChangeOb } func (*ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { + if len(ids) == 0 { + return nil + } + query, args, err := sqlx.In("delete from Object where ObjectID in (?)", ids) if err != nil { return err diff --git a/common/pkgs/db/object_block.go b/common/pkgs/db/object_block.go index f420bfb..a0638e8 100644 --- a/common/pkgs/db/object_block.go +++ b/common/pkgs/db/object_block.go @@ -30,6 +30,10 @@ func (db *ObjectBlockDB) Create(ctx SQLContext, objectID cdssdk.ObjectID, index } func (db *ObjectBlockDB) BatchCreate(ctx SQLContext, blocks []stgmod.ObjectBlock) error { + if len(blocks) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert ignore into ObjectBlock(ObjectID, `Index`, NodeID, FileHash) values(:ObjectID, :Index, :NodeID, :FileHash)", 4, @@ -44,6 +48,10 @@ func (db *ObjectBlockDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.Object } func (db *ObjectBlockDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from ObjectBlock where ObjectID in (?)", objectIDs) if err != nil { @@ -59,6 +67,10 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.Packag } func (db *ObjectBlockDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, fileHashes []string) error { + if len(fileHashes) == 0 { + return nil + } + query, args, err := sqlx.In("delete from ObjectBlock where NodeID = ? and FileHash in (?)", nodeID, fileHashes) if err != nil { return err diff --git a/common/pkgs/db/pinned_object.go b/common/pkgs/db/pinned_object.go index 6853315..d3c7d7b 100644 --- a/common/pkgs/db/pinned_object.go +++ b/common/pkgs/db/pinned_object.go @@ -40,6 +40,10 @@ func (*PinnedObjectDB) TryCreate(ctx SQLContext, nodeID cdssdk.NodeID, objectID } func (*PinnedObjectDB) BatchTryCreate(ctx SQLContext, pinneds []cdssdk.PinnedObject) error { + if len(pinneds) == 0 { + return nil + } + return BatchNamedExec(ctx, "insert ignore into PinnedObject values(:NodeID,:ObjectID,:CreateTime)", 3, pinneds, nil) } @@ -54,6 +58,10 @@ func (*PinnedObjectDB) CreateFromPackage(ctx SQLContext, packageID cdssdk.Packag } func (db *PinnedObjectDB) ObjectBatchCreate(ctx SQLContext, objectID cdssdk.ObjectID, nodeIDs []cdssdk.NodeID) error { + if len(nodeIDs) == 0 { + return nil + } + for _, id := range nodeIDs { err := db.TryCreate(ctx, id, objectID, time.Now()) if err != nil { @@ -74,6 +82,10 @@ func (*PinnedObjectDB) DeleteByObjectID(ctx SQLContext, objectID cdssdk.ObjectID } func (*PinnedObjectDB) BatchDeleteByObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) error { + if len(objectIDs) == 0 { + return nil + } + // TODO in语句有长度限制 query, args, err := sqlx.In("delete from PinnedObject where ObjectID in (?)", objectIDs) if err != nil { @@ -94,7 +106,11 @@ func (*PinnedObjectDB) DeleteInPackageAtNode(ctx SQLContext, packageID cdssdk.Pa } func (*PinnedObjectDB) NodeBatchDelete(ctx SQLContext, nodeID cdssdk.NodeID, objectIDs []cdssdk.ObjectID) error { - query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", objectIDs) + if len(objectIDs) == 0 { + return nil + } + + query, args, err := sqlx.In("delete from PinnedObject where NodeID = ? and ObjectID in (?)", nodeID, objectIDs) if err != nil { return err } diff --git a/common/pkgs/db/utils.go b/common/pkgs/db/utils.go index 5dde56f..2614355 100644 --- a/common/pkgs/db/utils.go +++ b/common/pkgs/db/utils.go @@ -31,7 +31,7 @@ func BatchNamedExec[T any](ctx SQLContext, sql string, argCnt int, arr []T, call ret, err := ctx.NamedExec(sql, arr[:curBatchSize]) if err != nil { - return nil + return err } if callback != nil && !callback(ret) { return nil @@ -63,7 +63,7 @@ func BatchNamedQuery[T any](ctx SQLContext, sql string, argCnt int, arr []T, cal ret, err := ctx.NamedQuery(sql, arr[:curBatchSize]) if err != nil { - return nil + return err } if callback != nil && !callback(ret) { return nil