You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

uploader.go 4.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package uploadersdk
  2. import (
  3. "fmt"
  4. "gitlink.org.cn/cloudream/common/pkgs/types"
  5. sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
  6. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  7. "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
  8. "gitlink.org.cn/cloudream/common/utils/http2"
  9. "gitlink.org.cn/cloudream/common/utils/serder"
  10. "net/url"
  11. "strings"
  12. )
  13. type DataScheduleReq struct {
  14. PackageID cdssdk.PackageID `json:"packageID"`
  15. DataType string `json:"dataType"`
  16. ScheduleTarget ScheduleTarget `json:"scheduleTarget"`
  17. }
  18. //type DataScheduleResp struct {
  19. // Results []sch.DataScheduleResult `json:"data"`
  20. //}
  21. type TmpDataScheduleResult struct {
  22. Cluster sch.DataDetail `json:"cluster"`
  23. PackageID cdssdk.PackageID `json:"packageID"`
  24. Status bool `json:"status"`
  25. Msg string `json:"msg"`
  26. }
  27. func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) {
  28. targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule")
  29. if err != nil {
  30. return nil, err
  31. }
  32. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  33. Body: req,
  34. })
  35. if err != nil {
  36. return nil, err
  37. }
  38. println(resp.Body)
  39. contType := resp.Header.Get("Content-Type")
  40. if strings.Contains(contType, http2.ContentTypeJSON) {
  41. var codeResp response[[]TmpDataScheduleResult]
  42. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  43. return nil, fmt.Errorf("parsing response: %w", err)
  44. }
  45. if codeResp.Code == ResponseCodeOK {
  46. var results []sch.DataScheduleResult
  47. for _, tmpResult := range codeResp.Data {
  48. result := sch.DataScheduleResult{
  49. PackageID: tmpResult.PackageID,
  50. Status: tmpResult.Status,
  51. Msg: tmpResult.Msg,
  52. Clusters: []sch.DataDetail{
  53. tmpResult.Cluster,
  54. },
  55. }
  56. results = append(results, result)
  57. }
  58. return results, nil
  59. }
  60. return nil, codeResp.ToError()
  61. }
  62. return nil, fmt.Errorf("unknow response content type: %s", contType)
  63. }
  64. type UploadReq struct {
  65. DataType string `json:"dataType"`
  66. Source UploadSource `json:"source"`
  67. Target UploadTarget `json:"target"`
  68. //StorageIDs []cdssdk.StorageID `json:"storageIDs"`
  69. }
  70. type UploadSource interface {
  71. Noop()
  72. }
  73. var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
  74. (*PackageSource)(nil),
  75. (*UrlSource)(nil),
  76. )
  77. var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")
  78. type PackageSource struct {
  79. serder.Metadata `union:"jcs"`
  80. UploadSourceBase
  81. Type string `json:"type"`
  82. PackageID cdssdk.PackageID `json:"packageID"`
  83. }
  84. type UrlSource struct {
  85. serder.Metadata `union:"url"`
  86. UploadSourceBase
  87. Type string `json:"type"`
  88. Url string `json:"url"`
  89. }
  90. type UploadSourceBase struct{}
  91. func (d *UploadSourceBase) Noop() {}
  92. type UploadTarget interface {
  93. Noop()
  94. }
  95. var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
  96. (*UrlTarget)(nil),
  97. (*ApiTarget)(nil),
  98. )
  99. var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")
  100. type UrlTarget struct {
  101. serder.Metadata `union:"url"`
  102. UploadTargetBase
  103. Type string `json:"type"`
  104. ClusterID ClusterID `json:"clusterId"`
  105. JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"`
  106. }
  107. type ApiTarget struct {
  108. serder.Metadata `union:"api"`
  109. UploadTargetBase
  110. Type string `json:"type"`
  111. Clusters []ClusterID `json:"clusters"`
  112. }
  113. type UploadTargetBase struct{}
  114. func (d *UploadTargetBase) Noop() {}
  115. type UploadResp struct {
  116. PackageID cdssdk.PackageID `json:"packageID"`
  117. ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
  118. ClusterID ClusterID `json:"clusterID"`
  119. JsonData string `json:"jsonData"`
  120. Status bool `json:"status"`
  121. Message string `json:"message"`
  122. }
  123. func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
  124. targetUrl, err := url.JoinPath(c.baseURL, "/dataUpload")
  125. if err != nil {
  126. return nil, err
  127. }
  128. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  129. Body: req,
  130. })
  131. if err != nil {
  132. return nil, err
  133. }
  134. contType := resp.Header.Get("Content-Type")
  135. if strings.Contains(contType, http2.ContentTypeJSON) {
  136. var codeResp response[UploadResp]
  137. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  138. return nil, fmt.Errorf("parsing response: %w", err)
  139. }
  140. if codeResp.Code == ResponseCodeOK {
  141. return &codeResp.Data, nil
  142. }
  143. return nil, codeResp.ToError()
  144. }
  145. return nil, fmt.Errorf("unknow response content type: %s", contType)
  146. }