| @@ -79,7 +79,6 @@ func (s *Server) initRouters() { | |||||
| rt.POST(cdsapi.BucketCreatePath, s.Bucket().Create) | rt.POST(cdsapi.BucketCreatePath, s.Bucket().Create) | ||||
| rt.POST(cdsapi.BucketDeletePath, s.Bucket().Delete) | rt.POST(cdsapi.BucketDeletePath, s.Bucket().Delete) | ||||
| rt.GET(cdsapi.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) | rt.GET(cdsapi.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) | ||||
| } | } | ||||
| func (s *Server) routeV1(eg *gin.Engine) { | func (s *Server) routeV1(eg *gin.Engine) { | ||||
| @@ -117,4 +116,7 @@ func (s *Server) routeV1(eg *gin.Engine) { | |||||
| v1.POST(cdsapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create) | v1.POST(cdsapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create) | ||||
| v1.POST(cdsapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete) | v1.POST(cdsapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete) | ||||
| v1.GET(cdsapi.BucketListUserBucketsPath, s.awsAuth.Auth, s.Bucket().ListUserBuckets) | v1.GET(cdsapi.BucketListUserBucketsPath, s.awsAuth.Auth, s.Bucket().ListUserBuckets) | ||||
| v1.POST(cdsapi.UserCreatePath, s.User().Create) | |||||
| v1.POST(cdsapi.UserDeletePath, s.User().Delete) | |||||
| } | } | ||||
| @@ -0,0 +1,57 @@ | |||||
| package http | |||||
| import ( | |||||
| "net/http" | |||||
| "github.com/gin-gonic/gin" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" | |||||
| ) | |||||
| type UserService struct { | |||||
| *Server | |||||
| } | |||||
| func (s *Server) User() *UserService { | |||||
| return &UserService{ | |||||
| Server: s, | |||||
| } | |||||
| } | |||||
| func (s *UserService) Create(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "User.Create") | |||||
| var req cdsapi.UserCreate | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| user, err := s.svc.UserSvc().Create(req.Name) | |||||
| if err != nil { | |||||
| log.Warnf("create user: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error())) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(user)) | |||||
| } | |||||
| func (s *UserService) Delete(ctx *gin.Context) { | |||||
| log := logger.WithField("HTTP", "User.Delete") | |||||
| var req cdsapi.UserDelete | |||||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||||
| log.Warnf("binding body: %s", err.Error()) | |||||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||||
| return | |||||
| } | |||||
| if err := s.svc.UserSvc().Delete(req.UserID); err != nil { | |||||
| log.Warnf("delete user: %s", err.Error()) | |||||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error())) | |||||
| return | |||||
| } | |||||
| ctx.JSON(http.StatusOK, OK(nil)) | |||||
| } | |||||
| @@ -0,0 +1,43 @@ | |||||
| package services | |||||
| import ( | |||||
| "fmt" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| ) | |||||
| type UserService struct { | |||||
| *Service | |||||
| } | |||||
| func (svc *Service) UserSvc() *UserService { | |||||
| return &UserService{Service: svc} | |||||
| } | |||||
| func (svc *UserService) Create(name string) (cdssdk.User, error) { | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return cdssdk.User{}, fmt.Errorf("new coordinator client: %w", err) | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| resp, err := coorCli.CreateUser(coormq.ReqCreateUser(name)) | |||||
| if err != nil { | |||||
| return cdssdk.User{}, err | |||||
| } | |||||
| return resp.User, nil | |||||
| } | |||||
| func (svc *UserService) Delete(userID cdssdk.UserID) error { | |||||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||||
| if err != nil { | |||||
| return fmt.Errorf("new coordinator client: %w", err) | |||||
| } | |||||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||||
| _, err = coorCli.DeleteUser(coormq.ReqDeleteUser(userID)) | |||||
| return err | |||||
| } | |||||
| @@ -10,15 +10,6 @@ import ( | |||||
| // TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 | // TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 | ||||
| type Storage = cdssdk.Storage | type Storage = cdssdk.Storage | ||||
| type User struct { | |||||
| UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` | |||||
| Password string `gorm:"column:Password; type:varchar(255); not null" json:"password"` | |||||
| } | |||||
| func (User) TableName() string { | |||||
| return "User" | |||||
| } | |||||
| type UserBucket struct { | type UserBucket struct { | ||||
| UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` | UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` | ||||
| BucketID cdssdk.BucketID `gorm:"column:BucketID; primaryKey; type:bigint" json:"bucketID"` | BucketID cdssdk.BucketID `gorm:"column:BucketID; primaryKey; type:bigint" json:"bucketID"` | ||||
| @@ -2,7 +2,7 @@ package db2 | |||||
| import ( | import ( | ||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||||
| "gorm.io/gorm" | |||||
| ) | ) | ||||
| type UserDB struct { | type UserDB struct { | ||||
| @@ -13,8 +13,32 @@ func (db *DB) User() *UserDB { | |||||
| return &UserDB{DB: db} | return &UserDB{DB: db} | ||||
| } | } | ||||
| func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (model.User, error) { | |||||
| var ret model.User | |||||
| func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (cdssdk.User, error) { | |||||
| var ret cdssdk.User | |||||
| err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error | err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error | ||||
| return ret, err | return ret, err | ||||
| } | } | ||||
| func (db *UserDB) GetByName(ctx SQLContext, name string) (cdssdk.User, error) { | |||||
| var ret cdssdk.User | |||||
| err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error | |||||
| return ret, err | |||||
| } | |||||
| func (db *UserDB) Create(ctx SQLContext, name string) (cdssdk.User, error) { | |||||
| _, err := db.GetByName(ctx, name) | |||||
| if err == nil { | |||||
| return cdssdk.User{}, gorm.ErrDuplicatedKey | |||||
| } | |||||
| if err != gorm.ErrRecordNotFound { | |||||
| return cdssdk.User{}, err | |||||
| } | |||||
| user := cdssdk.User{Name: name} | |||||
| err = ctx.Table("User").Create(&user).Error | |||||
| return user, err | |||||
| } | |||||
| func (*UserDB) Delete(ctx SQLContext, userID cdssdk.UserID) error { | |||||
| return ctx.Table("User").Delete(&cdssdk.User{UserID: userID}).Error | |||||
| } | |||||
| @@ -13,6 +13,12 @@ func (db *DB) UserBucket() *UserBucketDB { | |||||
| return &UserBucketDB{DB: db} | return &UserBucketDB{DB: db} | ||||
| } | } | ||||
| func (*UserBucketDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserBucket, error) { | |||||
| var userBuckets []model.UserBucket | |||||
| err := ctx.Table("UserBucket").Where("UserID = ?", userID).Find(&userBuckets).Error | |||||
| return userBuckets, err | |||||
| } | |||||
| func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error { | func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error { | ||||
| userBucket := model.UserBucket{ | userBucket := model.UserBucket{ | ||||
| UserID: userID, | UserID: userID, | ||||
| @@ -24,3 +30,7 @@ func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssd | |||||
| func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error { | func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error { | ||||
| return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error | return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error | ||||
| } | } | ||||
| func (*UserBucketDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||||
| return ctx.Table("UserBucket").Where("UserID = ?", userID).Delete(&model.UserBucket{}).Error | |||||
| } | |||||
| @@ -0,0 +1,34 @@ | |||||
| package db2 | |||||
| import ( | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||||
| ) | |||||
| type UserHubDB struct { | |||||
| *DB | |||||
| } | |||||
| func (db *DB) UserHub() *UserHubDB { | |||||
| return &UserHubDB{db} | |||||
| } | |||||
| func (*UserHubDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserHub, error) { | |||||
| var userHubs []model.UserHub | |||||
| if err := ctx.Table("UserHub").Where("UserID = ?", userID).Find(&userHubs).Error; err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return userHubs, nil | |||||
| } | |||||
| func (*UserHubDB) Create(ctx SQLContext, userID cdssdk.UserID, hubID cdssdk.HubID) error { | |||||
| return ctx.Table("UserHub").Create(&model.UserHub{ | |||||
| UserID: userID, | |||||
| HubID: hubID, | |||||
| }).Error | |||||
| } | |||||
| func (*UserHubDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||||
| return ctx.Table("UserHub").Delete(&model.UserHub{}, "UserID = ?", userID).Error | |||||
| } | |||||
| @@ -0,0 +1,34 @@ | |||||
| package db2 | |||||
| import ( | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||||
| ) | |||||
| type UserStorageDB struct { | |||||
| *DB | |||||
| } | |||||
| func (db *DB) UserStorage() *UserStorageDB { | |||||
| return &UserStorageDB{db} | |||||
| } | |||||
| func (*UserStorageDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserStorage, error) { | |||||
| var userStgs []model.UserStorage | |||||
| if err := ctx.Table("UserStorage").Where("UserID = ?", userID).Find(&userStgs).Error; err != nil { | |||||
| return nil, err | |||||
| } | |||||
| return userStgs, nil | |||||
| } | |||||
| func (*UserStorageDB) Create(ctx SQLContext, userID cdssdk.UserID, stgID cdssdk.StorageID) error { | |||||
| return ctx.Table("UserStorage").Create(&model.UserStorage{ | |||||
| UserID: userID, | |||||
| StorageID: stgID, | |||||
| }).Error | |||||
| } | |||||
| func (*UserStorageDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||||
| return ctx.Table("UserStorage").Delete(&model.UserStorage{}, "UserID = ?", userID).Error | |||||
| } | |||||
| @@ -21,6 +21,8 @@ type Service interface { | |||||
| PackageService | PackageService | ||||
| StorageService | StorageService | ||||
| UserService | |||||
| } | } | ||||
| type Server struct { | type Server struct { | ||||
| @@ -0,0 +1,67 @@ | |||||
| package coordinator | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| ) | |||||
| type UserService interface { | |||||
| CreateUser(msg *CreateUser) (*CreateUserResp, *mq.CodeMessage) | |||||
| DeleteUser(msg *DeleteUser) (*DeleteUserResp, *mq.CodeMessage) | |||||
| } | |||||
| // 创建用户 | |||||
| var _ = Register(Service.CreateUser) | |||||
| type CreateUser struct { | |||||
| mq.MessageBodyBase | |||||
| Name string `json:"name"` | |||||
| } | |||||
| type CreateUserResp struct { | |||||
| mq.MessageBodyBase | |||||
| User cdssdk.User `json:"user"` | |||||
| } | |||||
| func ReqCreateUser(name string) *CreateUser { | |||||
| return &CreateUser{ | |||||
| Name: name, | |||||
| } | |||||
| } | |||||
| func RespCreateUser(user cdssdk.User) *CreateUserResp { | |||||
| return &CreateUserResp{ | |||||
| User: user, | |||||
| } | |||||
| } | |||||
| func (c *Client) CreateUser(msg *CreateUser) (*CreateUserResp, error) { | |||||
| return mq.Request(Service.CreateUser, c.rabbitCli, msg) | |||||
| } | |||||
| // 删除用户 | |||||
| var _ = Register(Service.DeleteUser) | |||||
| type DeleteUser struct { | |||||
| mq.MessageBodyBase | |||||
| UserID cdssdk.UserID `json:"userID"` | |||||
| } | |||||
| type DeleteUserResp struct { | |||||
| mq.MessageBodyBase | |||||
| } | |||||
| func ReqDeleteUser(userID cdssdk.UserID) *DeleteUser { | |||||
| return &DeleteUser{ | |||||
| UserID: userID, | |||||
| } | |||||
| } | |||||
| func RespDeleteUser() *DeleteUserResp { | |||||
| return &DeleteUserResp{} | |||||
| } | |||||
| func (c *Client) DeleteUser(msg *DeleteUser) (*DeleteUserResp, error) { | |||||
| return mq.Request(Service.DeleteUser, c.rabbitCli, msg) | |||||
| } | |||||
| @@ -178,7 +178,7 @@ func (s *ShardStore) createWithAwsSha256(stream io.Reader) (types.FileInfo, erro | |||||
| key, fileName := s.createTempFile() | key, fileName := s.createTempFile() | ||||
| counter := io2.NewCounter(stream) | |||||
| counter := io2.Counter(stream) | |||||
| resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ | resp, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ | ||||
| Bucket: aws.String(s.Bucket), | Bucket: aws.String(s.Bucket), | ||||
| @@ -218,7 +218,7 @@ func (s *ShardStore) createWithCalcSha256(stream io.Reader) (types.FileInfo, err | |||||
| key, fileName := s.createTempFile() | key, fileName := s.createTempFile() | ||||
| hashStr := io2.NewReadHasher(sha256.New(), stream) | hashStr := io2.NewReadHasher(sha256.New(), stream) | ||||
| counter := io2.NewCounter(hashStr) | |||||
| counter := io2.Counter(hashStr) | |||||
| _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ | _, err := s.cli.PutObject(context.TODO(), &s3.PutObjectInput{ | ||||
| Bucket: aws.String(s.Bucket), | Bucket: aws.String(s.Bucket), | ||||
| @@ -55,7 +55,7 @@ func migrate(configPath string) { | |||||
| migrateOne(db, cdssdk.Storage{}) | migrateOne(db, cdssdk.Storage{}) | ||||
| migrateOne(db, model.UserStorage{}) | migrateOne(db, model.UserStorage{}) | ||||
| migrateOne(db, model.UserBucket{}) | migrateOne(db, model.UserBucket{}) | ||||
| migrateOne(db, model.User{}) | |||||
| migrateOne(db, cdssdk.User{}) | |||||
| migrateOne(db, model.UserHub{}) | migrateOne(db, model.UserHub{}) | ||||
| fmt.Println("migrate success") | fmt.Println("migrate success") | ||||
| @@ -0,0 +1,127 @@ | |||||
| package mq | |||||
| import ( | |||||
| "errors" | |||||
| "fmt" | |||||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2" | |||||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||||
| "gorm.io/gorm" | |||||
| ) | |||||
| func (svc *Service) CreateUser(msg *coormq.CreateUser) (*coormq.CreateUserResp, *mq.CodeMessage) { | |||||
| var user cdssdk.User | |||||
| err := svc.db2.DoTx(func(tx db2.SQLContext) error { | |||||
| var err error | |||||
| user, err = svc.db2.User().Create(tx, msg.Name) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating user: %w", err) | |||||
| } | |||||
| // TODO 目前新建用户的权限与ID 1的相同 | |||||
| hubs, err := svc.db2.UserHub().GetByUserID(tx, 1) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting user hubs: %w", err) | |||||
| } | |||||
| stgs, err := svc.db2.UserStorage().GetByUserID(tx, 1) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting user storages: %w", err) | |||||
| } | |||||
| for _, hub := range hubs { | |||||
| err := svc.db2.UserHub().Create(tx, user.UserID, hub.HubID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating user hub: %w", err) | |||||
| } | |||||
| } | |||||
| for _, stg := range stgs { | |||||
| err := svc.db2.UserStorage().Create(tx, user.UserID, stg.StorageID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating user storage: %w", err) | |||||
| } | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| logger.WithField("Name", msg.Name). | |||||
| Warn(err.Error()) | |||||
| if errors.Is(err, gorm.ErrDuplicatedKey) { | |||||
| return nil, mq.Failed(errorcode.DataExists, "user name already exists") | |||||
| } | |||||
| return nil, mq.Failed(errorcode.OperationFailed, err.Error()) | |||||
| } | |||||
| return mq.ReplyOK(coormq.RespCreateUser(user)) | |||||
| } | |||||
| func (svc *Service) DeleteUser(msg *coormq.DeleteUser) (*coormq.DeleteUserResp, *mq.CodeMessage) { | |||||
| // TODO 目前不能删除ID 1的用户 | |||||
| if msg.UserID == 1 { | |||||
| return nil, mq.Failed(errorcode.OperationFailed, "cannot delete the default user") | |||||
| } | |||||
| err := svc.db2.DoTx(func(tx db2.SQLContext) error { | |||||
| err := svc.db2.User().Delete(tx, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting user: %w", err) | |||||
| } | |||||
| err = svc.db2.UserHub().DeleteByUserID(tx, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting user hubs: %w", err) | |||||
| } | |||||
| err = svc.db2.UserStorage().DeleteByUserID(tx, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting user storages: %w", err) | |||||
| } | |||||
| bkts, err := svc.db2.UserBucket().GetByUserID(tx, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting user buckets: %w", err) | |||||
| } | |||||
| for _, bkt := range bkts { | |||||
| pkgs, err := svc.db2.Package().GetBucketPackages(tx, bkt.BucketID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("getting bucket packages: %w", err) | |||||
| } | |||||
| for _, pkg := range pkgs { | |||||
| err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err) | |||||
| } | |||||
| } | |||||
| err = svc.db2.Bucket().Delete(tx, bkt.BucketID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting bucket: %w", err) | |||||
| } | |||||
| } | |||||
| err = svc.db2.UserBucket().DeleteByUserID(tx, msg.UserID) | |||||
| if err != nil { | |||||
| return fmt.Errorf("deleting user buckets: %w", err) | |||||
| } | |||||
| return nil | |||||
| }) | |||||
| if err != nil { | |||||
| logger.WithField("UserID", msg.UserID). | |||||
| Warn(err.Error()) | |||||
| return nil, mq.Failed(errorcode.OperationFailed, err.Error()) | |||||
| } | |||||
| return mq.ReplyOK(coormq.RespDeleteUser()) | |||||
| } | |||||