You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space.go 7.9 kB

5 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
6 months ago
6 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  6. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  7. "gorm.io/gorm"
  8. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  9. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  10. cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
  11. "gitlink.org.cn/cloudream/jcs-pub/common/ecode"
  12. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  13. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  15. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
  17. )
  18. type UserSpaceService struct {
  19. *Service
  20. }
  21. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  22. return &UserSpaceService{Service: svc}
  23. }
  24. func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (clitypes.UserSpace, error) {
  25. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  26. }
  27. func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) {
  28. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  29. }
  30. func (svc *UserSpaceService) GetAll() ([]clitypes.UserSpace, error) {
  31. return svc.DB.UserSpace().GetAll(svc.DB.DefCtx())
  32. }
  33. func (svc *UserSpaceService) Create(req cliapi.UserSpaceCreate) (*cliapi.UserSpaceCreateResp, *ecode.CodeError) {
  34. db2 := svc.DB
  35. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  36. space, err := db2.UserSpace().GetByName(tx, req.Name)
  37. if err == nil {
  38. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  39. }
  40. if err != gorm.ErrRecordNotFound {
  41. return clitypes.UserSpace{}, err
  42. }
  43. space = clitypes.UserSpace{
  44. Name: req.Name,
  45. Storage: req.Storage,
  46. Credential: req.Credential,
  47. ShardStore: req.ShardStore,
  48. Features: req.Features,
  49. WorkingDir: clitypes.PathFromJcsPathString(req.WorkingDir),
  50. Revision: 0,
  51. }
  52. err = db2.UserSpace().Create(tx, &space)
  53. if err != nil {
  54. return clitypes.UserSpace{}, err
  55. }
  56. return space, nil
  57. })
  58. if err == gorm.ErrDuplicatedKey {
  59. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  60. }
  61. if err != nil {
  62. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  63. }
  64. return &cliapi.UserSpaceCreateResp{UserSpace: space}, nil
  65. }
  66. func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpaceUpdateResp, *ecode.CodeError) {
  67. db2 := svc.DB
  68. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  69. space, err := db2.UserSpace().GetByID(tx, req.UserSpaceID)
  70. if err != nil {
  71. return clitypes.UserSpace{}, err
  72. }
  73. if space.Name != req.Name {
  74. _, err = db2.UserSpace().GetByName(tx, req.Name)
  75. if err == nil {
  76. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  77. }
  78. if err != gorm.ErrRecordNotFound {
  79. return clitypes.UserSpace{}, err
  80. }
  81. }
  82. space.Name = req.Name
  83. space.Credential = req.Credential
  84. space.Features = req.Features
  85. space.Revision += 1
  86. return space, db2.UserSpace().UpdateColumns(tx, space, "Name", "Credential", "Features", "Revision")
  87. })
  88. if err == gorm.ErrDuplicatedKey {
  89. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  90. }
  91. if err != nil {
  92. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  93. }
  94. // 通知元数据缓存无效
  95. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  96. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  97. svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID)
  98. // TODO 考虑加锁再进行操作
  99. return &cliapi.UserSpaceUpdateResp{UserSpace: space}, nil
  100. }
  101. func (svc *UserSpaceService) Delete(req cliapi.UserSpaceDelete) (*cliapi.UserSpaceDeleteResp, *ecode.CodeError) {
  102. db2 := svc.DB
  103. err := db2.DoTx(func(tx db.SQLContext) error {
  104. err := db2.UserSpace().DeleteByID(tx, req.UserSpaceID)
  105. if err != nil {
  106. return err
  107. }
  108. err = db2.ObjectBlock().DeleteByUserSpaceID(tx, req.UserSpaceID)
  109. if err != nil {
  110. return err
  111. }
  112. err = db2.PinnedObject().DeleteByUserSpaceID(tx, req.UserSpaceID)
  113. if err != nil {
  114. return err
  115. }
  116. err = db2.ObjectAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  117. if err != nil {
  118. return err
  119. }
  120. err = db2.PackageAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  121. if err != nil {
  122. return err
  123. }
  124. return nil
  125. })
  126. if err != nil {
  127. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  128. }
  129. // 通知元数据缓存无效
  130. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  131. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  132. svc.StgPool.Drop(stgglb.UserID, req.UserSpaceID)
  133. // TODO 考虑加锁再进行操作,并且增加机制打断已经在进行的操作。
  134. return &cliapi.UserSpaceDeleteResp{}, nil
  135. }
  136. func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTestResp, *ecode.CodeError) {
  137. detail := clitypes.UserSpaceDetail{
  138. UserID: stgglb.UserID,
  139. UserSpace: clitypes.UserSpace{
  140. Name: "test",
  141. Storage: req.Storage,
  142. Credential: req.Credential,
  143. WorkingDir: clitypes.PathFromJcsPathString(req.WorikingDir),
  144. },
  145. }
  146. blder := factory.GetBuilder(&detail)
  147. baseStore, err := blder.CreateBaseStore(false)
  148. if err != nil {
  149. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  150. }
  151. err = baseStore.Test()
  152. if err != nil {
  153. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  154. }
  155. return &cliapi.UserSpaceTestResp{}, nil
  156. }
  157. func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
  158. destStg := svc.UserSpaceMeta.Get(userspaceID)
  159. if destStg == nil {
  160. return fmt.Errorf("userspace not found: %d", userspaceID)
  161. }
  162. details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
  163. if err != nil {
  164. return err
  165. }
  166. rootJPath := clitypes.PathFromJcsPathString(rootPath)
  167. var pinned []clitypes.ObjectID
  168. plans := exec.NewPlanBuilder()
  169. for _, obj := range details {
  170. strg, err := svc.StrategySelector.Select(strategy.Request{
  171. Detail: obj,
  172. DestLocation: destStg.UserSpace.Storage.GetLocation(),
  173. })
  174. if err != nil {
  175. return fmt.Errorf("select download strategy: %w", err)
  176. }
  177. ft := ioswitch2.NewFromTo()
  178. switch strg := strg.(type) {
  179. case *strategy.DirectStrategy:
  180. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream()))
  181. case *strategy.ECReconstructStrategy:
  182. for i, b := range strg.Blocks {
  183. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  184. ft.ECParam = &strg.Redundancy
  185. }
  186. default:
  187. return fmt.Errorf("unsupported download strategy: %T", strg)
  188. }
  189. objPath := clitypes.PathFromJcsPathString(obj.Object.Path)
  190. dstPath := rootJPath.ConcatNew(objPath)
  191. ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath))
  192. // 顺便保存到同存储服务的分片存储中
  193. if destStg.UserSpace.ShardStore != nil {
  194. ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), ""))
  195. pinned = append(pinned, obj.Object.ObjectID)
  196. }
  197. err = parser.Parse(ft, plans)
  198. if err != nil {
  199. return fmt.Errorf("parse plan: %w", err)
  200. }
  201. }
  202. mutex, err := reqbuilder.NewBuilder().
  203. UserSpace().Buzy(userspaceID).
  204. MutexLock(svc.PubLock)
  205. if err != nil {
  206. return fmt.Errorf("acquire locks failed, err: %w", err)
  207. }
  208. defer mutex.Unlock()
  209. // 记录访问统计
  210. for _, obj := range details {
  211. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  212. }
  213. exeCtx := exec.NewExecContext()
  214. exec.SetValueByType(exeCtx, svc.StgPool)
  215. drv := plans.Execute(exeCtx)
  216. _, err = drv.Wait(context.Background())
  217. if err != nil {
  218. return err
  219. }
  220. return nil
  221. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。