You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

user_space.go 12 kB

5 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
6 months ago
6 months ago
7 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package services
  2. import (
  3. "context"
  4. "fmt"
  5. "path"
  6. "strings"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/trie"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types"
  12. "gorm.io/gorm"
  13. "gitlink.org.cn/cloudream/jcs-pub/client/internal/db"
  14. "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
  15. cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
  16. "gitlink.org.cn/cloudream/jcs-pub/common/ecode"
  17. stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
  18. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2"
  19. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch2/parser"
  20. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/publock/reqbuilder"
  21. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/factory"
  22. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/types"
  23. )
  24. type UserSpaceService struct {
  25. *Service
  26. }
  27. func (svc *Service) UserSpaceSvc() *UserSpaceService {
  28. return &UserSpaceService{Service: svc}
  29. }
  30. func (svc *UserSpaceService) Get(userspaceID clitypes.UserSpaceID) (clitypes.UserSpace, error) {
  31. return svc.DB.UserSpace().GetByID(svc.DB.DefCtx(), userspaceID)
  32. }
  33. func (svc *UserSpaceService) GetByName(name string) (clitypes.UserSpace, error) {
  34. return svc.DB.UserSpace().GetByName(svc.DB.DefCtx(), name)
  35. }
  36. func (svc *UserSpaceService) GetAll() ([]clitypes.UserSpace, error) {
  37. return svc.DB.UserSpace().GetAll(svc.DB.DefCtx())
  38. }
  39. func (svc *UserSpaceService) Create(req cliapi.UserSpaceCreate) (*cliapi.UserSpaceCreateResp, *ecode.CodeError) {
  40. db2 := svc.DB
  41. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  42. space, err := db2.UserSpace().GetByName(tx, req.Name)
  43. if err == nil {
  44. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  45. }
  46. if err != gorm.ErrRecordNotFound {
  47. return clitypes.UserSpace{}, err
  48. }
  49. space = clitypes.UserSpace{
  50. Name: req.Name,
  51. Storage: req.Storage,
  52. Credential: req.Credential,
  53. ShardStore: req.ShardStore,
  54. Features: req.Features,
  55. WorkingDir: req.WorkingDir,
  56. Revision: 0,
  57. }
  58. err = db2.UserSpace().Create(tx, &space)
  59. if err != nil {
  60. return clitypes.UserSpace{}, err
  61. }
  62. return space, nil
  63. })
  64. if err == gorm.ErrDuplicatedKey {
  65. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  66. }
  67. if err != nil {
  68. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  69. }
  70. return &cliapi.UserSpaceCreateResp{UserSpace: space}, nil
  71. }
  72. func (svc *UserSpaceService) Update(req cliapi.UserSpaceUpdate) (*cliapi.UserSpaceUpdateResp, *ecode.CodeError) {
  73. db2 := svc.DB
  74. space, err := db.DoTx01(db2, func(tx db.SQLContext) (clitypes.UserSpace, error) {
  75. space, err := db2.UserSpace().GetByID(tx, req.UserSpaceID)
  76. if err != nil {
  77. return clitypes.UserSpace{}, err
  78. }
  79. if space.Name != req.Name {
  80. _, err = db2.UserSpace().GetByName(tx, req.Name)
  81. if err == nil {
  82. return clitypes.UserSpace{}, gorm.ErrDuplicatedKey
  83. }
  84. if err != gorm.ErrRecordNotFound {
  85. return clitypes.UserSpace{}, err
  86. }
  87. }
  88. space.Name = req.Name
  89. space.Credential = req.Credential
  90. space.Features = req.Features
  91. space.Revision += 1
  92. return space, db2.UserSpace().UpdateColumns(tx, space, "Name", "Credential", "Features", "Revision")
  93. })
  94. if err == gorm.ErrDuplicatedKey {
  95. return nil, ecode.New(ecode.DataExists, "user space name already exists")
  96. }
  97. if err != nil {
  98. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  99. }
  100. // 通知元数据缓存无效
  101. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  102. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  103. svc.StgPool.Drop(stgglb.UserID, space.UserSpaceID)
  104. // TODO 考虑加锁再进行操作
  105. return &cliapi.UserSpaceUpdateResp{UserSpace: space}, nil
  106. }
  107. func (svc *UserSpaceService) Delete(req cliapi.UserSpaceDelete) (*cliapi.UserSpaceDeleteResp, *ecode.CodeError) {
  108. db2 := svc.DB
  109. err := db2.DoTx(func(tx db.SQLContext) error {
  110. err := db2.UserSpace().DeleteByID(tx, req.UserSpaceID)
  111. if err != nil {
  112. return err
  113. }
  114. err = db2.ObjectBlock().DeleteByUserSpaceID(tx, req.UserSpaceID)
  115. if err != nil {
  116. return err
  117. }
  118. err = db2.PinnedObject().DeleteByUserSpaceID(tx, req.UserSpaceID)
  119. if err != nil {
  120. return err
  121. }
  122. err = db2.ObjectAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  123. if err != nil {
  124. return err
  125. }
  126. err = db2.PackageAccessStat().DeleteByUserSpaceID(tx, req.UserSpaceID)
  127. if err != nil {
  128. return err
  129. }
  130. return nil
  131. })
  132. if err != nil {
  133. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  134. }
  135. // 通知元数据缓存无效
  136. svc.UserSpaceMeta.Drop([]clitypes.UserSpaceID{req.UserSpaceID})
  137. // 通知存储服务组件池停止组件。TODO 对于在Hub上运行的组件,需要一个机制去定时清理
  138. svc.StgPool.Drop(stgglb.UserID, req.UserSpaceID)
  139. // TODO 考虑加锁再进行操作,并且增加机制打断已经在进行的操作。
  140. return &cliapi.UserSpaceDeleteResp{}, nil
  141. }
  142. func (svc *UserSpaceService) Test(req cliapi.UserSpaceTest) (*cliapi.UserSpaceTestResp, *ecode.CodeError) {
  143. detail := clitypes.UserSpaceDetail{
  144. UserID: stgglb.UserID,
  145. UserSpace: clitypes.UserSpace{
  146. Name: "test",
  147. Storage: req.Storage,
  148. Credential: req.Credential,
  149. WorkingDir: req.WorikingDir,
  150. },
  151. }
  152. blder := factory.GetBuilder(&detail)
  153. baseStore, err := blder.CreateBaseStore(false)
  154. if err != nil {
  155. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  156. }
  157. // TODO 可以考虑增加一个专门用于检查配置的接口F
  158. _, err = baseStore.ListAll("")
  159. if err != nil {
  160. return nil, ecode.Newf(ecode.OperationFailed, "%v", err)
  161. }
  162. return &cliapi.UserSpaceTestResp{}, nil
  163. }
  164. func (svc *UserSpaceService) DownloadPackage(packageID clitypes.PackageID, userspaceID clitypes.UserSpaceID, rootPath string) error {
  165. coorCli := stgglb.CoordinatorRPCPool.Get()
  166. defer coorCli.Release()
  167. destStg := svc.UserSpaceMeta.Get(userspaceID)
  168. if destStg == nil {
  169. return fmt.Errorf("userspace not found: %d", userspaceID)
  170. }
  171. details, err := db.DoTx11(svc.DB, svc.DB.Object().GetPackageObjectDetails, packageID)
  172. if err != nil {
  173. return err
  174. }
  175. var pinned []clitypes.ObjectID
  176. plans := exec.NewPlanBuilder()
  177. for _, obj := range details {
  178. strg, err := svc.StrategySelector.Select(strategy.Request{
  179. Detail: obj,
  180. DestLocation: destStg.UserSpace.Storage.GetLocation(),
  181. })
  182. if err != nil {
  183. return fmt.Errorf("select download strategy: %w", err)
  184. }
  185. ft := ioswitch2.NewFromTo()
  186. switch strg := strg.(type) {
  187. case *strategy.DirectStrategy:
  188. ft.AddFrom(ioswitch2.NewFromShardstore(strg.Detail.Object.FileHash, strg.UserSpace, ioswitch2.RawStream()))
  189. case *strategy.ECReconstructStrategy:
  190. for i, b := range strg.Blocks {
  191. ft.AddFrom(ioswitch2.NewFromShardstore(b.FileHash, strg.UserSpaces[i], ioswitch2.ECStream(b.Index)))
  192. ft.ECParam = &strg.Redundancy
  193. }
  194. default:
  195. return fmt.Errorf("unsupported download strategy: %T", strg)
  196. }
  197. ft.AddTo(ioswitch2.NewToBaseStore(*destStg, path.Join(rootPath, obj.Object.Path)))
  198. // 顺便保存到同存储服务的分片存储中
  199. if destStg.UserSpace.ShardStore != nil {
  200. ft.AddTo(ioswitch2.NewToShardStore(*destStg, ioswitch2.RawStream(), ""))
  201. pinned = append(pinned, obj.Object.ObjectID)
  202. }
  203. err = parser.Parse(ft, plans)
  204. if err != nil {
  205. return fmt.Errorf("parse plan: %w", err)
  206. }
  207. }
  208. mutex, err := reqbuilder.NewBuilder().
  209. UserSpace().Buzy(userspaceID).
  210. MutexLock(svc.PubLock)
  211. if err != nil {
  212. return fmt.Errorf("acquire locks failed, err: %w", err)
  213. }
  214. defer mutex.Unlock()
  215. // 记录访问统计
  216. for _, obj := range details {
  217. svc.AccessStat.AddAccessCounter(obj.Object.ObjectID, packageID, userspaceID, 1)
  218. }
  219. exeCtx := exec.NewExecContext()
  220. exec.SetValueByType(exeCtx, svc.StgPool)
  221. drv := plans.Execute(exeCtx)
  222. _, err = drv.Wait(context.Background())
  223. if err != nil {
  224. return err
  225. }
  226. return nil
  227. }
  228. func (svc *UserSpaceService) SpaceToSpace(srcSpaceID clitypes.UserSpaceID, srcPath string, dstSpaceID clitypes.UserSpaceID, dstPath string) (clitypes.SpaceToSpaceResult, error) {
  229. srcSpace := svc.UserSpaceMeta.Get(srcSpaceID)
  230. if srcSpace == nil {
  231. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source userspace not found: %d", srcSpaceID)
  232. }
  233. srcStore, err := svc.StgPool.GetBaseStore(srcSpace)
  234. if err != nil {
  235. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get source userspace store: %w", err)
  236. }
  237. dstSpace := svc.UserSpaceMeta.Get(dstSpaceID)
  238. if dstSpace == nil {
  239. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination userspace not found: %d", dstSpaceID)
  240. }
  241. dstStore, err := svc.StgPool.GetBaseStore(dstSpace)
  242. if err != nil {
  243. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("get destination userspace store: %w", err)
  244. }
  245. srcPath = strings.Trim(srcPath, cdssdk.ObjectPathSeparator)
  246. dstPath = strings.Trim(dstPath, cdssdk.ObjectPathSeparator)
  247. if srcPath == "" {
  248. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("source path is empty")
  249. }
  250. if dstPath == "" {
  251. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("destination path is empty")
  252. }
  253. entries, cerr := srcStore.ListAll(srcPath)
  254. if cerr != nil {
  255. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("list all from source userspace: %w", cerr)
  256. }
  257. srcPathComps := clitypes.SplitObjectPath(srcPath)
  258. srcDirCompLen := len(srcPathComps) - 1
  259. entryTree := trie.NewTrie[*types.ListEntry]()
  260. for _, e := range entries {
  261. pa, ok := strings.CutSuffix(e.Path, clitypes.ObjectPathSeparator)
  262. comps := clitypes.SplitObjectPath(pa)
  263. e.Path = pa
  264. e2 := e
  265. entryTree.CreateWords(comps[srcDirCompLen:]).Value = &e2
  266. e2.IsDir = e2.IsDir || ok
  267. }
  268. entryTree.Iterate(func(path []string, node *trie.Node[*types.ListEntry], isWordNode bool) trie.VisitCtrl {
  269. if node.Value == nil {
  270. return trie.VisitContinue
  271. }
  272. if node.Value.IsDir && len(node.WordNexts) > 0 {
  273. node.Value = nil
  274. return trie.VisitContinue
  275. }
  276. if !node.Value.IsDir && len(node.WordNexts) == 0 {
  277. node.WordNexts = nil
  278. }
  279. return trie.VisitContinue
  280. })
  281. var filePathes []string
  282. var dirPathes []string
  283. entryTree.Iterate(func(path []string, node *trie.Node[*types.ListEntry], isWordNode bool) trie.VisitCtrl {
  284. if node.Value == nil {
  285. return trie.VisitContinue
  286. }
  287. if node.Value.IsDir {
  288. dirPathes = append(dirPathes, node.Value.Path)
  289. } else {
  290. filePathes = append(filePathes, node.Value.Path)
  291. }
  292. return trie.VisitContinue
  293. })
  294. mutex, err := reqbuilder.NewBuilder().UserSpace().Buzy(srcSpaceID).Buzy(dstSpaceID).MutexLock(svc.PubLock)
  295. if err != nil {
  296. return clitypes.SpaceToSpaceResult{}, fmt.Errorf("acquire lock: %w", err)
  297. }
  298. defer mutex.Unlock()
  299. var success []string
  300. var failed []string
  301. for _, f := range filePathes {
  302. newPath := strings.Replace(f, srcPath, dstPath, 1)
  303. ft := ioswitch2.NewFromTo()
  304. ft.AddFrom(ioswitch2.NewFromBaseStore(*srcSpace, f))
  305. ft.AddTo(ioswitch2.NewToBaseStore(*dstSpace, newPath))
  306. plans := exec.NewPlanBuilder()
  307. err := parser.Parse(ft, plans)
  308. if err != nil {
  309. failed = append(failed, f)
  310. logger.Warnf("s2s: parse plan of file %v: %v", f, err)
  311. continue
  312. }
  313. exeCtx := exec.NewExecContext()
  314. exec.SetValueByType(exeCtx, svc.StgPool)
  315. _, cerr := plans.Execute(exeCtx).Wait(context.Background())
  316. if cerr != nil {
  317. failed = append(failed, f)
  318. logger.Warnf("s2s: execute plan of file %v: %v", f, cerr)
  319. continue
  320. }
  321. success = append(success, f)
  322. }
  323. newDirPathes := make([]string, 0, len(dirPathes))
  324. for i := range dirPathes {
  325. newDirPathes = append(newDirPathes, strings.Replace(dirPathes[i], srcPath, dstPath, 1))
  326. }
  327. for _, d := range newDirPathes {
  328. err := dstStore.Mkdir(d)
  329. if err != nil {
  330. failed = append(failed, d)
  331. } else {
  332. success = append(success, d)
  333. }
  334. }
  335. return clitypes.SpaceToSpaceResult{
  336. Success: success,
  337. Failed: failed,
  338. }, nil
  339. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。