You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

package.go 7.6 kB

2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package services
  2. import (
  3. "database/sql"
  4. "fmt"
  5. "sort"
  6. "github.com/jmoiron/sqlx"
  7. "gitlink.org.cn/cloudream/common/consts/errorcode"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/pkgs/mq"
  10. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  11. "gitlink.org.cn/cloudream/storage/common/pkgs/db/model"
  12. coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
  13. )
  14. func (svc *Service) GetPackage(msg *coormq.GetPackage) (*coormq.GetPackageResp, *mq.CodeMessage) {
  15. pkg, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  16. if err != nil {
  17. logger.WithField("PackageID", msg.PackageID).
  18. Warnf("get package: %s", err.Error())
  19. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  20. }
  21. return mq.ReplyOK(coormq.NewGetPackageResp(pkg))
  22. }
  23. func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePackageResp, *mq.CodeMessage) {
  24. var pkgID cdssdk.PackageID
  25. err := svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  26. var err error
  27. pkgID, err = svc.db.Package().Create(svc.db.SQLCtx(), msg.BucketID, msg.Name)
  28. return err
  29. })
  30. if err != nil {
  31. logger.WithField("BucketID", msg.BucketID).
  32. WithField("Name", msg.Name).
  33. Warnf("creating package: %s", err.Error())
  34. return nil, mq.Failed(errorcode.OperationFailed, "creating package failed")
  35. }
  36. return mq.ReplyOK(coormq.NewCreatePackageResp(pkgID))
  37. }
  38. func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePackageResp, *mq.CodeMessage) {
  39. _, err := svc.db.Package().GetByID(svc.db.SQLCtx(), msg.PackageID)
  40. if err != nil {
  41. logger.WithField("PackageID", msg.PackageID).
  42. Warnf("get package: %s", err.Error())
  43. return nil, mq.Failed(errorcode.OperationFailed, "get package failed")
  44. }
  45. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  46. // 先执行删除操作
  47. if len(msg.Deletes) > 0 {
  48. if err := svc.db.Object().BatchDelete(tx, msg.Deletes); err != nil {
  49. return fmt.Errorf("deleting objects: %w", err)
  50. }
  51. }
  52. // 再执行添加操作
  53. if len(msg.Adds) > 0 {
  54. if _, err := svc.db.Object().BatchAdd(tx, msg.PackageID, msg.Adds); err != nil {
  55. return fmt.Errorf("adding objects: %w", err)
  56. }
  57. }
  58. return nil
  59. })
  60. if err != nil {
  61. logger.Warn(err.Error())
  62. return nil, mq.Failed(errorcode.OperationFailed, "update package failed")
  63. }
  64. return mq.ReplyOK(coormq.NewUpdatePackageResp())
  65. }
  66. func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePackageResp, *mq.CodeMessage) {
  67. isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
  68. if err != nil {
  69. logger.WithField("UserID", msg.UserID).
  70. WithField("PackageID", msg.PackageID).
  71. Warnf("check package available failed, err: %s", err.Error())
  72. return nil, mq.Failed(errorcode.OperationFailed, "check package available failed")
  73. }
  74. if !isAva {
  75. logger.WithField("UserID", msg.UserID).
  76. WithField("PackageID", msg.PackageID).
  77. Warnf("package is not available to the user")
  78. return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user")
  79. }
  80. err = svc.db.DoTx(sql.LevelDefault, func(tx *sqlx.Tx) error {
  81. err := svc.db.Package().SoftDelete(tx, msg.PackageID)
  82. if err != nil {
  83. return fmt.Errorf("soft delete package: %w", err)
  84. }
  85. err = svc.db.Package().DeleteUnused(tx, msg.PackageID)
  86. if err != nil {
  87. logger.WithField("UserID", msg.UserID).
  88. WithField("PackageID", msg.PackageID).
  89. Warnf("deleting unused package: %w", err.Error())
  90. }
  91. return nil
  92. })
  93. if err != nil {
  94. logger.WithField("UserID", msg.UserID).
  95. WithField("PackageID", msg.PackageID).
  96. Warnf("set package deleted failed, err: %s", err.Error())
  97. return nil, mq.Failed(errorcode.OperationFailed, "set package deleted failed")
  98. }
  99. return mq.ReplyOK(coormq.NewDeletePackageResp())
  100. }
  101. func (svc *Service) GetPackageCachedNodes(msg *coormq.GetPackageCachedNodes) (*coormq.GetPackageCachedNodesResp, *mq.CodeMessage) {
  102. isAva, err := svc.db.Package().IsAvailable(svc.db.SQLCtx(), msg.UserID, msg.PackageID)
  103. if err != nil {
  104. logger.WithField("UserID", msg.UserID).
  105. WithField("PackageID", msg.PackageID).
  106. Warnf("check package available failed, err: %s", err.Error())
  107. return nil, mq.Failed(errorcode.OperationFailed, "check package available failed")
  108. }
  109. if !isAva {
  110. logger.WithField("UserID", msg.UserID).
  111. WithField("PackageID", msg.PackageID).
  112. Warnf("package is not available to the user")
  113. return nil, mq.Failed(errorcode.OperationFailed, "package is not available to the user")
  114. }
  115. objDetails, err := svc.db.ObjectBlock().GetPackageBlockDetails(svc.db.SQLCtx(), msg.PackageID)
  116. if err != nil {
  117. logger.WithField("PackageID", msg.PackageID).
  118. Warnf("get package block details: %s", err.Error())
  119. return nil, mq.Failed(errorcode.OperationFailed, "get package block details failed")
  120. }
  121. var packageSize int64
  122. nodeInfoMap := make(map[cdssdk.NodeID]*cdssdk.NodePackageCachingInfo)
  123. for _, obj := range objDetails {
  124. // 只要存了文件的一个块,就认为此节点存了整个文件
  125. for _, block := range obj.Blocks {
  126. for _, nodeID := range block.CachedNodeIDs {
  127. info, ok := nodeInfoMap[nodeID]
  128. if !ok {
  129. info = &cdssdk.NodePackageCachingInfo{
  130. NodeID: nodeID,
  131. }
  132. nodeInfoMap[nodeID] = info
  133. }
  134. info.FileSize += obj.Object.Size
  135. info.ObjectCount++
  136. }
  137. }
  138. }
  139. var nodeInfos []cdssdk.NodePackageCachingInfo
  140. for _, nodeInfo := range nodeInfoMap {
  141. nodeInfos = append(nodeInfos, *nodeInfo)
  142. }
  143. sort.Slice(nodeInfos, func(i, j int) bool {
  144. return nodeInfos[i].NodeID < nodeInfos[j].NodeID
  145. })
  146. return mq.ReplyOK(coormq.NewGetPackageCachedNodesResp(nodeInfos, packageSize))
  147. }
  148. func (svc *Service) GetPackageLoadedNodes(msg *coormq.GetPackageLoadedNodes) (*coormq.GetPackageLoadedNodesResp, *mq.CodeMessage) {
  149. storages, err := svc.db.StoragePackage().FindPackageStorages(svc.db.SQLCtx(), msg.PackageID)
  150. if err != nil {
  151. logger.WithField("PackageID", msg.PackageID).
  152. Warnf("get storages by packageID failed, err: %s", err.Error())
  153. return nil, mq.Failed(errorcode.OperationFailed, "get storages by packageID failed")
  154. }
  155. uniqueNodeIDs := make(map[cdssdk.NodeID]bool)
  156. var nodeIDs []cdssdk.NodeID
  157. for _, stg := range storages {
  158. if !uniqueNodeIDs[stg.NodeID] {
  159. uniqueNodeIDs[stg.NodeID] = true
  160. nodeIDs = append(nodeIDs, stg.NodeID)
  161. }
  162. }
  163. return mq.ReplyOK(coormq.NewGetPackageLoadedNodesResp(nodeIDs))
  164. }
  165. func (svc *Service) GetPackageLoadLogDetails(msg *coormq.GetPackageLoadLogDetails) (*coormq.GetPackageLoadLogDetailsResp, *mq.CodeMessage) {
  166. var logs []coormq.PackageLoadLogDetail
  167. rawLogs, err := svc.db.StoragePackageLog().GetByPackageID(svc.db.SQLCtx(), msg.PackageID)
  168. if err != nil {
  169. logger.WithField("PackageID", msg.PackageID).
  170. Warnf("getting storage package log: %s", err.Error())
  171. return nil, mq.Failed(errorcode.OperationFailed, "get storage package log failed")
  172. }
  173. stgs := make(map[cdssdk.StorageID]model.Storage)
  174. for _, raw := range rawLogs {
  175. stg, ok := stgs[raw.StorageID]
  176. if !ok {
  177. stg, err = svc.db.Storage().GetByID(svc.db.SQLCtx(), raw.StorageID)
  178. if err != nil {
  179. logger.WithField("PackageID", msg.PackageID).
  180. Warnf("getting storage: %s", err.Error())
  181. return nil, mq.Failed(errorcode.OperationFailed, "get storage failed")
  182. }
  183. stgs[raw.StorageID] = stg
  184. }
  185. logs = append(logs, coormq.PackageLoadLogDetail{
  186. Storage: stg,
  187. UserID: raw.UserID,
  188. CreateTime: raw.CreateTime,
  189. })
  190. }
  191. return mq.ReplyOK(coormq.RespGetPackageLoadLogDetails(logs))
  192. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。