| @@ -45,7 +45,7 @@ func (s *Server) initRouters() { | |||
| initTemp(rt, s) | |||
| s.routeV1(s.engine) | |||
| s.routeV1(s.engine, rt) | |||
| rt.GET(cdsapi.ObjectListPathByPath, s.Object().ListByPath) | |||
| rt.POST(cdsapi.ObjectListByIDsPath, s.Object().ListByIDs) | |||
| @@ -79,10 +79,9 @@ func (s *Server) initRouters() { | |||
| rt.POST(cdsapi.BucketCreatePath, s.Bucket().Create) | |||
| rt.POST(cdsapi.BucketDeletePath, s.Bucket().Delete) | |||
| rt.GET(cdsapi.BucketListUserBucketsPath, s.Bucket().ListUserBuckets) | |||
| } | |||
| func (s *Server) routeV1(eg *gin.Engine) { | |||
| func (s *Server) routeV1(eg *gin.Engine, rt gin.IRoutes) { | |||
| v1 := eg.Group("/v1") | |||
| v1.GET(cdsapi.ObjectListPathByPath, s.awsAuth.Auth, s.Object().ListByPath) | |||
| @@ -117,4 +116,7 @@ func (s *Server) routeV1(eg *gin.Engine) { | |||
| v1.POST(cdsapi.BucketCreatePath, s.awsAuth.Auth, s.Bucket().Create) | |||
| v1.POST(cdsapi.BucketDeletePath, s.awsAuth.Auth, s.Bucket().Delete) | |||
| v1.GET(cdsapi.BucketListUserBucketsPath, s.awsAuth.Auth, s.Bucket().ListUserBuckets) | |||
| rt.POST(cdsapi.UserCreatePath, s.User().Create) | |||
| rt.POST(cdsapi.UserDeletePath, s.User().Delete) | |||
| } | |||
| @@ -0,0 +1,57 @@ | |||
| package http | |||
| import ( | |||
| "net/http" | |||
| "github.com/gin-gonic/gin" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" | |||
| ) | |||
| type UserService struct { | |||
| *Server | |||
| } | |||
| func (s *Server) User() *UserService { | |||
| return &UserService{ | |||
| Server: s, | |||
| } | |||
| } | |||
| func (s *UserService) Create(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "User.Create") | |||
| var req cdsapi.UserCreate | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| user, err := s.svc.UserSvc().Create(req.Name) | |||
| if err != nil { | |||
| log.Warnf("create user: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error())) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(cdsapi.UserCreateResp{User: user})) | |||
| } | |||
| func (s *UserService) Delete(ctx *gin.Context) { | |||
| log := logger.WithField("HTTP", "User.Delete") | |||
| var req cdsapi.UserDelete | |||
| if err := ctx.ShouldBindJSON(&req); err != nil { | |||
| log.Warnf("binding body: %s", err.Error()) | |||
| ctx.JSON(http.StatusBadRequest, Failed(errorcode.BadArgument, "missing argument or invalid argument")) | |||
| return | |||
| } | |||
| if err := s.svc.UserSvc().Delete(req.UserID); err != nil { | |||
| log.Warnf("delete user: %s", err.Error()) | |||
| ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, err.Error())) | |||
| return | |||
| } | |||
| ctx.JSON(http.StatusOK, OK(nil)) | |||
| } | |||
| @@ -0,0 +1,43 @@ | |||
| package services | |||
| import ( | |||
| "fmt" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| stgglb "gitlink.org.cn/cloudream/storage/common/globals" | |||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||
| ) | |||
| type UserService struct { | |||
| *Service | |||
| } | |||
| func (svc *Service) UserSvc() *UserService { | |||
| return &UserService{Service: svc} | |||
| } | |||
| func (svc *UserService) Create(name string) (cdssdk.User, error) { | |||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return cdssdk.User{}, fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| resp, err := coorCli.CreateUser(coormq.ReqCreateUser(name)) | |||
| if err != nil { | |||
| return cdssdk.User{}, err | |||
| } | |||
| return resp.User, nil | |||
| } | |||
| func (svc *UserService) Delete(userID cdssdk.UserID) error { | |||
| coorCli, err := stgglb.CoordinatorMQPool.Acquire() | |||
| if err != nil { | |||
| return fmt.Errorf("new coordinator client: %w", err) | |||
| } | |||
| defer stgglb.CoordinatorMQPool.Release(coorCli) | |||
| _, err = coorCli.DeleteUser(coormq.ReqDeleteUser(userID)) | |||
| return err | |||
| } | |||
| @@ -10,15 +10,6 @@ import ( | |||
| // TODO 可以考虑逐步迁移到cdssdk中。迁移思路:数据对象应该包含的字段都迁移到cdssdk中,内部使用的一些特殊字段则留在这里 | |||
| type Storage = cdssdk.Storage | |||
| type User struct { | |||
| UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` | |||
| Password string `gorm:"column:Password; type:varchar(255); not null" json:"password"` | |||
| } | |||
| func (User) TableName() string { | |||
| return "User" | |||
| } | |||
| type UserBucket struct { | |||
| UserID cdssdk.UserID `gorm:"column:UserID; primaryKey; type:bigint" json:"userID"` | |||
| BucketID cdssdk.BucketID `gorm:"column:BucketID; primaryKey; type:bigint" json:"bucketID"` | |||
| @@ -2,7 +2,7 @@ package db2 | |||
| import ( | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||
| "gorm.io/gorm" | |||
| ) | |||
| type UserDB struct { | |||
| @@ -13,8 +13,32 @@ func (db *DB) User() *UserDB { | |||
| return &UserDB{DB: db} | |||
| } | |||
| func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (model.User, error) { | |||
| var ret model.User | |||
| func (db *UserDB) GetByID(ctx SQLContext, userID cdssdk.UserID) (cdssdk.User, error) { | |||
| var ret cdssdk.User | |||
| err := ctx.Table("User").Where("UserID = ?", userID).First(&ret).Error | |||
| return ret, err | |||
| } | |||
| func (db *UserDB) GetByName(ctx SQLContext, name string) (cdssdk.User, error) { | |||
| var ret cdssdk.User | |||
| err := ctx.Table("User").Where("Name = ?", name).First(&ret).Error | |||
| return ret, err | |||
| } | |||
| func (db *UserDB) Create(ctx SQLContext, name string) (cdssdk.User, error) { | |||
| _, err := db.GetByName(ctx, name) | |||
| if err == nil { | |||
| return cdssdk.User{}, gorm.ErrDuplicatedKey | |||
| } | |||
| if err != gorm.ErrRecordNotFound { | |||
| return cdssdk.User{}, err | |||
| } | |||
| user := cdssdk.User{Name: name} | |||
| err = ctx.Table("User").Create(&user).Error | |||
| return user, err | |||
| } | |||
| func (*UserDB) Delete(ctx SQLContext, userID cdssdk.UserID) error { | |||
| return ctx.Table("User").Delete(&cdssdk.User{UserID: userID}).Error | |||
| } | |||
| @@ -13,6 +13,12 @@ func (db *DB) UserBucket() *UserBucketDB { | |||
| return &UserBucketDB{DB: db} | |||
| } | |||
| func (*UserBucketDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserBucket, error) { | |||
| var userBuckets []model.UserBucket | |||
| err := ctx.Table("UserBucket").Where("UserID = ?", userID).Find(&userBuckets).Error | |||
| return userBuckets, err | |||
| } | |||
| func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssdk.BucketID) error { | |||
| userBucket := model.UserBucket{ | |||
| UserID: userID, | |||
| @@ -24,3 +30,7 @@ func (*UserBucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketID cdssd | |||
| func (*UserBucketDB) DeleteByBucketID(ctx SQLContext, bucketID cdssdk.BucketID) error { | |||
| return ctx.Table("UserBucket").Where("BucketID = ?", bucketID).Delete(&model.UserBucket{}).Error | |||
| } | |||
| func (*UserBucketDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||
| return ctx.Table("UserBucket").Where("UserID = ?", userID).Delete(&model.UserBucket{}).Error | |||
| } | |||
| @@ -0,0 +1,34 @@ | |||
| package db2 | |||
| import ( | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||
| ) | |||
| type UserHubDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) UserHub() *UserHubDB { | |||
| return &UserHubDB{db} | |||
| } | |||
| func (*UserHubDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserHub, error) { | |||
| var userHubs []model.UserHub | |||
| if err := ctx.Table("UserHub").Where("UserID = ?", userID).Find(&userHubs).Error; err != nil { | |||
| return nil, err | |||
| } | |||
| return userHubs, nil | |||
| } | |||
| func (*UserHubDB) Create(ctx SQLContext, userID cdssdk.UserID, hubID cdssdk.HubID) error { | |||
| return ctx.Table("UserHub").Create(&model.UserHub{ | |||
| UserID: userID, | |||
| HubID: hubID, | |||
| }).Error | |||
| } | |||
| func (*UserHubDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||
| return ctx.Table("UserHub").Delete(&model.UserHub{}, "UserID = ?", userID).Error | |||
| } | |||
| @@ -0,0 +1,34 @@ | |||
| package db2 | |||
| import ( | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" | |||
| ) | |||
| type UserStorageDB struct { | |||
| *DB | |||
| } | |||
| func (db *DB) UserStorage() *UserStorageDB { | |||
| return &UserStorageDB{db} | |||
| } | |||
| func (*UserStorageDB) GetByUserID(ctx SQLContext, userID cdssdk.UserID) ([]model.UserStorage, error) { | |||
| var userStgs []model.UserStorage | |||
| if err := ctx.Table("UserStorage").Where("UserID = ?", userID).Find(&userStgs).Error; err != nil { | |||
| return nil, err | |||
| } | |||
| return userStgs, nil | |||
| } | |||
| func (*UserStorageDB) Create(ctx SQLContext, userID cdssdk.UserID, stgID cdssdk.StorageID) error { | |||
| return ctx.Table("UserStorage").Create(&model.UserStorage{ | |||
| UserID: userID, | |||
| StorageID: stgID, | |||
| }).Error | |||
| } | |||
| func (*UserStorageDB) DeleteByUserID(ctx SQLContext, userID cdssdk.UserID) error { | |||
| return ctx.Table("UserStorage").Delete(&model.UserStorage{}, "UserID = ?", userID).Error | |||
| } | |||
| @@ -21,6 +21,8 @@ type Service interface { | |||
| PackageService | |||
| StorageService | |||
| UserService | |||
| } | |||
| type Server struct { | |||
| @@ -0,0 +1,67 @@ | |||
| package coordinator | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| ) | |||
| type UserService interface { | |||
| CreateUser(msg *CreateUser) (*CreateUserResp, *mq.CodeMessage) | |||
| DeleteUser(msg *DeleteUser) (*DeleteUserResp, *mq.CodeMessage) | |||
| } | |||
| // 创建用户 | |||
| var _ = Register(Service.CreateUser) | |||
| type CreateUser struct { | |||
| mq.MessageBodyBase | |||
| Name string `json:"name"` | |||
| } | |||
| type CreateUserResp struct { | |||
| mq.MessageBodyBase | |||
| User cdssdk.User `json:"user"` | |||
| } | |||
| func ReqCreateUser(name string) *CreateUser { | |||
| return &CreateUser{ | |||
| Name: name, | |||
| } | |||
| } | |||
| func RespCreateUser(user cdssdk.User) *CreateUserResp { | |||
| return &CreateUserResp{ | |||
| User: user, | |||
| } | |||
| } | |||
| func (c *Client) CreateUser(msg *CreateUser) (*CreateUserResp, error) { | |||
| return mq.Request(Service.CreateUser, c.rabbitCli, msg) | |||
| } | |||
| // 删除用户 | |||
| var _ = Register(Service.DeleteUser) | |||
| type DeleteUser struct { | |||
| mq.MessageBodyBase | |||
| UserID cdssdk.UserID `json:"userID"` | |||
| } | |||
| type DeleteUserResp struct { | |||
| mq.MessageBodyBase | |||
| } | |||
| func ReqDeleteUser(userID cdssdk.UserID) *DeleteUser { | |||
| return &DeleteUser{ | |||
| UserID: userID, | |||
| } | |||
| } | |||
| func RespDeleteUser() *DeleteUserResp { | |||
| return &DeleteUserResp{} | |||
| } | |||
| func (c *Client) DeleteUser(msg *DeleteUser) (*DeleteUserResp, error) { | |||
| return mq.Request(Service.DeleteUser, c.rabbitCli, msg) | |||
| } | |||
| @@ -55,7 +55,7 @@ func migrate(configPath string) { | |||
| migrateOne(db, cdssdk.Storage{}) | |||
| migrateOne(db, model.UserStorage{}) | |||
| migrateOne(db, model.UserBucket{}) | |||
| migrateOne(db, model.User{}) | |||
| migrateOne(db, cdssdk.User{}) | |||
| migrateOne(db, model.UserHub{}) | |||
| fmt.Println("migrate success") | |||
| @@ -0,0 +1,127 @@ | |||
| package mq | |||
| import ( | |||
| "errors" | |||
| "fmt" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| "gitlink.org.cn/cloudream/storage/common/pkgs/db2" | |||
| coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" | |||
| "gorm.io/gorm" | |||
| ) | |||
| func (svc *Service) CreateUser(msg *coormq.CreateUser) (*coormq.CreateUserResp, *mq.CodeMessage) { | |||
| var user cdssdk.User | |||
| err := svc.db2.DoTx(func(tx db2.SQLContext) error { | |||
| var err error | |||
| user, err = svc.db2.User().Create(tx, msg.Name) | |||
| if err != nil { | |||
| return fmt.Errorf("creating user: %w", err) | |||
| } | |||
| // TODO 目前新建用户的权限与ID 1的相同 | |||
| hubs, err := svc.db2.UserHub().GetByUserID(tx, 1) | |||
| if err != nil { | |||
| return fmt.Errorf("getting user hubs: %w", err) | |||
| } | |||
| stgs, err := svc.db2.UserStorage().GetByUserID(tx, 1) | |||
| if err != nil { | |||
| return fmt.Errorf("getting user storages: %w", err) | |||
| } | |||
| for _, hub := range hubs { | |||
| err := svc.db2.UserHub().Create(tx, user.UserID, hub.HubID) | |||
| if err != nil { | |||
| return fmt.Errorf("creating user hub: %w", err) | |||
| } | |||
| } | |||
| for _, stg := range stgs { | |||
| err := svc.db2.UserStorage().Create(tx, user.UserID, stg.StorageID) | |||
| if err != nil { | |||
| return fmt.Errorf("creating user storage: %w", err) | |||
| } | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("Name", msg.Name). | |||
| Warn(err.Error()) | |||
| if errors.Is(err, gorm.ErrDuplicatedKey) { | |||
| return nil, mq.Failed(errorcode.DataExists, "user name already exists") | |||
| } | |||
| return nil, mq.Failed(errorcode.OperationFailed, err.Error()) | |||
| } | |||
| return mq.ReplyOK(coormq.RespCreateUser(user)) | |||
| } | |||
| func (svc *Service) DeleteUser(msg *coormq.DeleteUser) (*coormq.DeleteUserResp, *mq.CodeMessage) { | |||
| // TODO 目前不能删除ID 1的用户 | |||
| if msg.UserID == 1 { | |||
| return nil, mq.Failed(errorcode.OperationFailed, "cannot delete the default user") | |||
| } | |||
| err := svc.db2.DoTx(func(tx db2.SQLContext) error { | |||
| err := svc.db2.User().Delete(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting user: %w", err) | |||
| } | |||
| err = svc.db2.UserHub().DeleteByUserID(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting user hubs: %w", err) | |||
| } | |||
| err = svc.db2.UserStorage().DeleteByUserID(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting user storages: %w", err) | |||
| } | |||
| bkts, err := svc.db2.UserBucket().GetByUserID(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting user buckets: %w", err) | |||
| } | |||
| for _, bkt := range bkts { | |||
| pkgs, err := svc.db2.Package().GetBucketPackages(tx, bkt.BucketID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting bucket packages: %w", err) | |||
| } | |||
| for _, pkg := range pkgs { | |||
| err := svc.db2.Package().DeleteComplete(tx, pkg.PackageID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting package %v: %w", pkg.PackageID, err) | |||
| } | |||
| } | |||
| err = svc.db2.Bucket().Delete(tx, bkt.BucketID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting bucket: %w", err) | |||
| } | |||
| } | |||
| err = svc.db2.UserBucket().DeleteByUserID(tx, msg.UserID) | |||
| if err != nil { | |||
| return fmt.Errorf("deleting user buckets: %w", err) | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| logger.WithField("UserID", msg.UserID). | |||
| Warn(err.Error()) | |||
| return nil, mq.Failed(errorcode.OperationFailed, err.Error()) | |||
| } | |||
| return mq.ReplyOK(coormq.RespDeleteUser()) | |||
| } | |||