You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

s2s.go 4.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package obs
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
  8. oms "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2"
  9. "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/model"
  10. omsregion "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/oms/v2/region"
  11. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  12. "gitlink.org.cn/cloudream/common/utils/os2"
  13. stgmod "gitlink.org.cn/cloudream/storage/common/models"
  14. "gitlink.org.cn/cloudream/storage/common/pkgs/storage/s3/utils"
  15. )
  16. type S2STransfer struct {
  17. dstStg *cdssdk.OBSType
  18. feat *cdssdk.S2STransferFeature
  19. taskID *int64
  20. omsCli *oms.OmsClient
  21. }
  22. func NewS2STransfer(dstStg *cdssdk.OBSType, feat *cdssdk.S2STransferFeature) *S2STransfer {
  23. return &S2STransfer{
  24. dstStg: dstStg,
  25. feat: feat,
  26. }
  27. }
  28. // 判断是否能从指定的源存储中直传到当前存储的目的路径
  29. func (s *S2STransfer) CanTransfer(src stgmod.StorageDetail) bool {
  30. req := s.makeRequest(src.Storage.Type, "")
  31. return req != nil
  32. }
  33. // 执行数据直传。返回传输后的文件路径
  34. func (s *S2STransfer) Transfer(ctx context.Context, src stgmod.StorageDetail, srcPath string) (string, error) {
  35. req := s.makeRequest(src.Storage.Type, srcPath)
  36. if req == nil {
  37. return "", fmt.Errorf("unsupported source storage type: %T", src.Storage.Type)
  38. }
  39. auth, err := basic.NewCredentialsBuilder().
  40. WithAk(s.dstStg.AK).
  41. WithSk(s.dstStg.SK).
  42. WithProjectId(s.dstStg.ProjectID).
  43. SafeBuild()
  44. if err != nil {
  45. return "", err
  46. }
  47. region, err := omsregion.SafeValueOf(s.dstStg.Region)
  48. if err != nil {
  49. return "", err
  50. }
  51. cli, err := oms.OmsClientBuilder().
  52. WithRegion(region).
  53. WithCredential(auth).
  54. SafeBuild()
  55. if err != nil {
  56. return "", err
  57. }
  58. tempPrefix := utils.JoinKey(s.feat.TempDir, os2.GenerateRandomFileName(10)) + "/"
  59. taskType := model.GetCreateTaskReqTaskTypeEnum().OBJECT
  60. s.omsCli = oms.NewOmsClient(cli)
  61. resp, err := s.omsCli.CreateTask(&model.CreateTaskRequest{
  62. Body: &model.CreateTaskReq{
  63. TaskType: &taskType,
  64. SrcNode: req,
  65. DstNode: &model.DstNodeReq{
  66. Region: s.dstStg.Region,
  67. Ak: s.dstStg.AK,
  68. Sk: s.dstStg.SK,
  69. Bucket: s.dstStg.Bucket,
  70. SavePrefix: &tempPrefix,
  71. },
  72. },
  73. })
  74. if err != nil {
  75. return "", fmt.Errorf("create task: %w", err)
  76. }
  77. s.taskID = resp.Id
  78. err = s.waitTask(ctx, *resp.Id)
  79. if err != nil {
  80. return "", fmt.Errorf("wait task: %w", err)
  81. }
  82. return utils.JoinKey(tempPrefix, srcPath), nil
  83. }
  84. func (s *S2STransfer) makeRequest(srcStg cdssdk.StorageType, srcPath string) *model.SrcNodeReq {
  85. switch srcStg := srcStg.(type) {
  86. case *cdssdk.OBSType:
  87. cloudType := "HuaweiCloud"
  88. return &model.SrcNodeReq{
  89. CloudType: &cloudType,
  90. Region: &srcStg.Region,
  91. Ak: &srcStg.AK,
  92. Sk: &srcStg.SK,
  93. Bucket: &srcStg.Bucket,
  94. ObjectKey: &[]string{srcPath},
  95. }
  96. default:
  97. return nil
  98. }
  99. }
  100. func (s *S2STransfer) waitTask(ctx context.Context, taskId int64) error {
  101. ticker := time.NewTicker(time.Second * 5)
  102. defer ticker.Stop()
  103. failures := 0
  104. for {
  105. resp, err := s.omsCli.ShowTask(&model.ShowTaskRequest{
  106. TaskId: fmt.Sprintf("%v", taskId),
  107. })
  108. if err != nil {
  109. if failures < 3 {
  110. failures++
  111. continue
  112. }
  113. return fmt.Errorf("show task failed too many times: %w", err)
  114. }
  115. failures = 0
  116. if *resp.Status == 3 {
  117. return fmt.Errorf("task stopped")
  118. }
  119. if *resp.Status == 4 {
  120. return errors.New(resp.ErrorReason.String())
  121. }
  122. if *resp.Status == 5 {
  123. return nil
  124. }
  125. select {
  126. case <-ticker.C:
  127. continue
  128. case <-ctx.Done():
  129. return ctx.Err()
  130. }
  131. }
  132. }
  133. // 完成传输
  134. func (s *S2STransfer) Complete() {
  135. }
  136. // 取消传输。如果已经调用了Complete,则这个方法应该无效果
  137. func (s *S2STransfer) Abort() {
  138. if s.taskID != nil {
  139. s.omsCli.StopTask(&model.StopTaskRequest{
  140. TaskId: fmt.Sprintf("%v", *s.taskID),
  141. })
  142. s.omsCli.DeleteTask(&model.DeleteTaskRequest{
  143. TaskId: fmt.Sprintf("%v", *s.taskID),
  144. })
  145. }
  146. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。