You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space.go 8.8 kB

5 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/samber/lo"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  10. "gorm.io/gorm"
  11. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  12. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  13. cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
  14. "gitlink.org.cn/cloudream/jcs-pub/common/ecode"
  15. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  17. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  18. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder"
  19. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
  20. )
  21. type UserSpaceService struct {
  22. *Service
  23. }
  24. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  25. return &UserSpaceService{Service: svc}
  26. }
  27. func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (clitypes.UserSpace, error) {
  28. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  29. }
  30. func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) {
  31. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  32. }
  33. func (svc *UserSpaceService) GetAll() ([]clitypes.UserSpace, error) {
  34. return svc.DB.UserSpace().GetAll(svc.DB.DefCtx())
  35. }
  36. func (svc *UserSpaceService) Create(req cliapi.UserSpaceCreate) (*cliapi.UserSpaceCreateResp, *ecode.CodeError) {
  37. db2 := svc.DB
  38. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  39. space, err := db2.UserSpace().GetByName(tx, req.Name)
  40. if err == nil {
  41. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  42. }
  43. if err != gorm.ErrRecordNotFound {
  44. return clitypes.UserSpace{}, err
  45. }
  46. space = clitypes.UserSpace{
  47. Name: req.Name,
  48. Storage: req.Storage,
  49. Credential: req.Credential,
  50. ShardStore: req.ShardStore,
  51. Features: req.Features,
  52. WorkingDir: clitypes.PathFromJcsPathString(req.WorkingDir),
  53. Revision: 0,
  54. }
  55. err = db2.UserSpace().Create(tx, &space)
  56. if err != nil {
  57. return clitypes.UserSpace{}, err
  58. }
  59. return space, nil
  60. })
  61. if err == gorm.ErrDuplicatedKey {
  62. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  63. }
  64. if err != nil {
  65. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  66. }
  67. return &cliapi.UserSpaceCreateResp{UserSpace: space}, nil
  68. }
  69. func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpaceUpdateResp, *ecode.CodeError) {
  70. db2 := svc.DB
  71. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  72. space, err := db2.UserSpace().GetByID(tx, req.UserSpaceID)
  73. if err != nil {
  74. return clitypes.UserSpace{}, err
  75. }
  76. if space.Name != req.Name {
  77. _, err = db2.UserSpace().GetByName(tx, req.Name)
  78. if err == nil {
  79. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  80. }
  81. if err != gorm.ErrRecordNotFound {
  82. return clitypes.UserSpace{}, err
  83. }
  84. }
  85. space.Name = req.Name
  86. space.Credential = req.Credential
  87. space.Features = req.Features
  88. space.Revision += 1
  89. return space, db2.UserSpace().UpdateColumns(tx, space, "Name", "Credential", "Features", "Revision")
  90. })
  91. if err == gorm.ErrDuplicatedKey {
  92. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  93. }
  94. if err != nil {
  95. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  96. }
  97. // 通知元数据缓存无效
  98. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  99. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  100. svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID)
  101. // TODO 考虑加锁再进行操作
  102. return &cliapi.UserSpaceUpdateResp{UserSpace: space}, nil
  103. }
  104. func (svc *UserSpaceService) Delete(req cliapi.UserSpaceDelete) (*cliapi.UserSpaceDeleteResp, *ecode.CodeError) {
  105. db2 := svc.DB
  106. err := db2.DoTx(func(tx db.SQLContext) error {
  107. err := db2.UserSpace().DeleteByID(tx, req.UserSpaceID)
  108. if err != nil {
  109. return err
  110. }
  111. err = db2.ObjectBlock().DeleteByUserSpaceID(tx, req.UserSpaceID)
  112. if err != nil {
  113. return err
  114. }
  115. err = db2.PinnedObject().DeleteByUserSpaceID(tx, req.UserSpaceID)
  116. if err != nil {
  117. return err
  118. }
  119. err = db2.ObjectAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  120. if err != nil {
  121. return err
  122. }
  123. err = db2.PackageAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  124. if err != nil {
  125. return err
  126. }
  127. return nil
  128. })
  129. if err != nil {
  130. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  131. }
  132. // 通知元数据缓存无效
  133. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  134. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  135. svc.StgPool.Drop(stgglb.UserID, req.UserSpaceID)
  136. // TODO 考虑加锁再进行操作,并且增加机制打断已经在进行的操作。
  137. return &cliapi.UserSpaceDeleteResp{}, nil
  138. }
  139. func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTestResp, *ecode.CodeError) {
  140. detail := clitypes.UserSpaceDetail{
  141. UserID: stgglb.UserID,
  142. UserSpace: clitypes.UserSpace{
  143. Name: "test",
  144. Storage: req.Storage,
  145. Credential: req.Credential,
  146. WorkingDir: clitypes.PathFromJcsPathString(req.WorikingDir),
  147. },
  148. }
  149. blder := factory.GetBuilder(&detail)
  150. baseStore, err := blder.CreateBaseStore(false)
  151. if err != nil {
  152. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  153. }
  154. err = baseStore.Test()
  155. if err != nil {
  156. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  157. }
  158. return &cliapi.UserSpaceTestResp{}, nil
  159. }
  160. func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
  161. destStg := svc.UserSpaceMeta.Get(userspaceID)
  162. if destStg == nil {
  163. return fmt.Errorf("userspace not found: %d", userspaceID)
  164. }
  165. details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
  166. if err != nil {
  167. return err
  168. }
  169. mutex, err := reqbuilder.NewBuilder().
  170. UserSpace().Buzy(userspaceID).
  171. MutexLock(svc.PubLock)
  172. if err != nil {
  173. return fmt.Errorf("acquire locks failed, err: %w", err)
  174. }
  175. defer mutex.Unlock()
  176. rootJPath := clitypes.PathFromJcsPathString(rootPath)
  177. dIndex := 0
  178. var pinned []clitypes.PinnedObject
  179. for dIndex < len(details) {
  180. plans := exec.NewPlanBuilder()
  181. for i := 0; i < 10 && dIndex < len(details); i++ {
  182. strg, err := svc.StrategySelector.Select(strategy.Request{
  183. Detail: details[dIndex],
  184. DestLocation: destStg.UserSpace.Storage.GetLocation(),
  185. })
  186. if err != nil {
  187. return fmt.Errorf("select download strategy: %w", err)
  188. }
  189. ft := ioswitch2.NewFromTo()
  190. switch strg := strg.(type) {
  191. case *strategy.DirectStrategy:
  192. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream()))
  193. case *strategy.ECReconstructStrategy:
  194. for i, b := range strg.Blocks {
  195. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  196. ft.ECParam = &strg.Redundancy
  197. }
  198. default:
  199. return fmt.Errorf("unsupported download strategy: %T", strg)
  200. }
  201. objPath := clitypes.PathFromJcsPathString(details[dIndex].Object.Path)
  202. dstPath := rootJPath.ConcatNew(objPath)
  203. ft.AddTo(ioswitch2.NewToBaseStore(*destStg, dstPath))
  204. // 顺便保存到同存储服务的分片存储中
  205. if destStg.UserSpace.ShardStore != nil {
  206. ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), ""))
  207. pinned = append(pinned, clitypes.PinnedObject{
  208. ObjectID: details[dIndex].Object.ObjectID,
  209. UserSpaceID: destStg.UserSpace.UserSpaceID,
  210. CreateTime: time.Now(),
  211. })
  212. }
  213. err = parser.Parse(ft, plans)
  214. if err != nil {
  215. return fmt.Errorf("parse plan: %w", err)
  216. }
  217. dIndex++
  218. }
  219. // 记录访问统计
  220. for _, obj := range details {
  221. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  222. }
  223. exeCtx := exec.NewExecContext()
  224. exec.SetValueByType(exeCtx, svc.StgPool)
  225. drv := plans.Execute(exeCtx)
  226. _, err = drv.Wait(context.Background())
  227. if err != nil {
  228. return err
  229. }
  230. err = svc.DB.DoTx(func(tx db.SQLContext) error {
  231. objIDs := make([]clitypes.ObjectID, len(pinned))
  232. for i, obj := range pinned {
  233. objIDs[i] = obj.ObjectID
  234. }
  235. avaiIDs, err := svc.DB.Object().BatchTestObjectID(tx, objIDs)
  236. if err != nil {
  237. return err
  238. }
  239. pinned = lo.Filter(pinned, func(p clitypes.PinnedObject, idx int) bool { return avaiIDs[p.ObjectID] })
  240. return svc.DB.PinnedObject().BatchTryCreate(svc.DB.DefCtx(), pinned)
  241. })
  242. if err != nil {
  243. logger.Warnf("create pinned objects: %v", err)
  244. }
  245. }
  246. return nil
  247. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。