You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

uploader.go 5.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package uploadersdk
  2. import (
  3. "fmt"
  4. "gitlink.org.cn/cloudream/common/pkgs/types"
  5. sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
  6. schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
  7. cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
  8. "gitlink.org.cn/cloudream/common/utils/http2"
  9. "gitlink.org.cn/cloudream/common/utils/serder"
  10. "net/url"
  11. "strings"
  12. )
  13. type DataID int64
  14. type FolderID int64
  15. type Cluster struct {
  16. PackageID cdssdk.PackageID `gorm:"column:package_id" json:"dataID"`
  17. ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
  18. StorageID cdssdk.StorageID `gorm:"column:storage_id" json:"storageID"`
  19. }
  20. func (Cluster) TableName() string {
  21. return "uploadedCluster" // 确保和数据库中的表名一致
  22. }
  23. type Package struct {
  24. UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
  25. PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
  26. PackageName string `gorm:"column:package_name" json:"packageName"`
  27. DataType string `gorm:"column:data_type" json:"dataType"`
  28. JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
  29. BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
  30. Objects []cdssdk.Object `gorm:"column:objects" json:"objects"`
  31. UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"`
  32. //UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据
  33. //BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据
  34. }
  35. type PackageDAO struct {
  36. UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
  37. PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
  38. PackageName string `gorm:"column:package_name" json:"packageName"`
  39. DataType string `gorm:"column:data_type" json:"dataType"`
  40. JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
  41. BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
  42. UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据
  43. }
  44. type DataScheduleReq struct {
  45. PackageID cdssdk.PackageID `json:"packageID"`
  46. DataType string `json:"dataType"`
  47. Clusters []Cluster `json:"clusters"`
  48. }
  49. type codeRepository struct {
  50. RepositoryName string
  51. ClusterID ClusterID
  52. }
  53. type DataScheduleResp struct {
  54. Results []sch.DataScheduleResult `json:"results"`
  55. }
  56. func (c *Client) DataSchedule(req DataScheduleReq) (*DataScheduleResp, error) {
  57. targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/schedule")
  58. if err != nil {
  59. return nil, err
  60. }
  61. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  62. Body: req,
  63. })
  64. if err != nil {
  65. return nil, err
  66. }
  67. contType := resp.Header.Get("Content-Type")
  68. if strings.Contains(contType, http2.ContentTypeJSON) {
  69. var codeResp response[DataScheduleResp]
  70. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  71. return nil, fmt.Errorf("parsing response: %w", err)
  72. }
  73. if codeResp.Code == ResponseCodeOK {
  74. return &codeResp.Data, nil
  75. }
  76. return nil, codeResp.ToError()
  77. }
  78. return nil, fmt.Errorf("unknow response content type: %s", contType)
  79. }
  80. type UploadReq struct {
  81. Type string `json:"type"`
  82. Source UploadSource `json:"source"`
  83. Target UploadTarget `json:"target"`
  84. StorageIDs []cdssdk.StorageID `json:"storageIDs"`
  85. }
  86. type UploadSource interface {
  87. Noop()
  88. }
  89. var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
  90. (*PackageSource)(nil),
  91. (*UrlSource)(nil),
  92. )
  93. var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")
  94. type PackageSource struct {
  95. serder.Metadata `union:"packageSource"`
  96. UploadSourceBase
  97. Type string `json:"type"`
  98. PackageID cdssdk.PackageID `json:"packageID"`
  99. }
  100. type UrlSource struct {
  101. serder.Metadata `union:"urlSource"`
  102. UploadSourceBase
  103. Type string `json:"type"`
  104. Url string `json:"url"`
  105. }
  106. type UploadSourceBase struct{}
  107. func (d *UploadSourceBase) Noop() {}
  108. type UploadTarget interface {
  109. Noop()
  110. }
  111. var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
  112. (*UrlTarget)(nil),
  113. (*ApiTarget)(nil),
  114. )
  115. var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")
  116. type UrlTarget struct {
  117. serder.Metadata `union:"url"`
  118. UploadTargetBase
  119. Clusters []ClusterID `json:"clusters"`
  120. }
  121. type ApiTarget struct {
  122. serder.Metadata `union:"api"`
  123. UploadTargetBase
  124. Clusters []ClusterID `json:"clusters"`
  125. }
  126. type UploadTargetBase struct{}
  127. func (d *UploadTargetBase) Noop() {}
  128. type UploadResp struct {
  129. PackageID cdssdk.PackageID `json:"packageID"`
  130. ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
  131. JsonData string `json:"jsonData"`
  132. }
  133. func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
  134. targetUrl, err := url.JoinPath(c.baseURL, "/data/upload")
  135. if err != nil {
  136. return nil, err
  137. }
  138. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  139. Body: req,
  140. })
  141. if err != nil {
  142. return nil, err
  143. }
  144. contType := resp.Header.Get("Content-Type")
  145. if strings.Contains(contType, http2.ContentTypeJSON) {
  146. var codeResp response[UploadResp]
  147. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  148. return nil, fmt.Errorf("parsing response: %w", err)
  149. }
  150. if codeResp.Code == ResponseCodeOK {
  151. return &codeResp.Data, nil
  152. }
  153. return nil, codeResp.ToError()
  154. }
  155. return nil, fmt.Errorf("unknow response content type: %s", contType)
  156. }