You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

chunkhandler.go 6.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. /*
  2. Copyright (c) [2023] [pcm]
  3. [pcm-coordinator] is licensed under Mulan PSL v2.
  4. You can use this software according to the terms and conditions of the Mulan PSL v2.
  5. You may obtain a copy of Mulan PSL v2 at:
  6. http://license.coscl.org.cn/MulanPSL2
  7. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
  8. EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
  9. MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
  10. See the Mulan PSL v2 for more details.
  11. */
  12. package image
  13. import (
  14. "bufio"
  15. "context"
  16. "encoding/base64"
  17. "fmt"
  18. "github.com/aws/aws-sdk-go/aws"
  19. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  20. types2 "github.com/docker/docker/api/types"
  21. "github.com/zeromicro/go-zero/core/logx"
  22. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
  23. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
  24. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils/fileutils"
  25. "io/ioutil"
  26. "k8s.io/apimachinery/pkg/util/json"
  27. "net/http"
  28. "os"
  29. "path/filepath"
  30. "strconv"
  31. "strings"
  32. "sync"
  33. "time"
  34. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
  35. )
  36. var dir, _ = os.Getwd()
  37. var uploadPath = filepath.Join(dir, "uploads")
  38. func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
  39. return func(w http.ResponseWriter, r *http.Request) {
  40. size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
  41. hash := r.PostFormValue("hash")
  42. name := r.PostFormValue("name")
  43. dataType := r.PostFormValue("dataType")
  44. kind := r.PostFormValue("kind")
  45. // 对比合并请求的文件大小和已上传文件夹大小
  46. toSize, err := fileutils.GetDirSize(filepath.Join(uploadTempPath, hash))
  47. if err != nil {
  48. logx.Error(err)
  49. result.HttpResult(r, w, nil, err)
  50. return
  51. }
  52. if size != toSize {
  53. fmt.Fprintf(w, "文件上传错误")
  54. }
  55. chunksPath := filepath.Join(uploadTempPath, hash)
  56. files, _ := ioutil.ReadDir(chunksPath)
  57. // 将文件根据索引序号排序
  58. filesSort := make(map[string]string)
  59. for _, f := range files {
  60. nameArr := strings.Split(f.Name(), "-")
  61. filesSort[nameArr[1]] = f.Name()
  62. }
  63. saveFile := filepath.Join(uploadPath, name)
  64. if exists, _ := fileutils.PathExists(saveFile); exists {
  65. os.Remove(saveFile)
  66. }
  67. fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
  68. var wg sync.WaitGroup
  69. filesCount := len(files)
  70. if filesCount != len(filesSort) {
  71. fmt.Fprintf(w, "文件上传错误2")
  72. }
  73. wg.Add(filesCount)
  74. for i := 0; i < filesCount; i++ {
  75. // 这里一定要注意按顺序读取不然文件就会损坏
  76. fileName := filepath.Join(chunksPath, filesSort[strconv.Itoa(i)])
  77. data, err := ioutil.ReadFile(fileName)
  78. fmt.Println(err)
  79. fs.Write(data)
  80. wg.Done()
  81. }
  82. wg.Wait()
  83. os.RemoveAll(chunksPath)
  84. // 保存到数据库表里
  85. svcCtx.DbEngin.Create(&models.File{
  86. Name: name,
  87. Hash: hash,
  88. DataType: dataType,
  89. Status: "local",
  90. Kind: kind,
  91. Bucket: "pcm"})
  92. // 根据数据类型按需上传(镜像推送到nexus 数据集和算法推送到云际存储)
  93. switch kind {
  94. case "image":
  95. err = pushImage(svcCtx, hash, name)
  96. case "dataSet", "algorithm":
  97. err = uploadStorage(svcCtx, hash, name)
  98. }
  99. // 删除本地文件 避免占用本地存储资源
  100. defer os.Remove(filepath.Join(uploadPath, name))
  101. defer fs.Close()
  102. result.HttpResult(r, w, nil, err)
  103. }
  104. }
  105. // 同步数据集到modelArts
  106. func syncDataSet() {
  107. }
  108. // 上传文件到云集存储
  109. func uploadStorage(svcCtx *svc.ServiceContext, hash string, name string) error {
  110. fileInfo, err := os.Open(filepath.Join(uploadPath, name))
  111. if err != nil {
  112. logx.Error(err)
  113. return err
  114. }
  115. _, err = svcCtx.Uploader.Upload(&s3manager.UploadInput{
  116. Bucket: aws.String("pcm"),
  117. Key: aws.String(name),
  118. Body: fileInfo,
  119. })
  120. if err != nil {
  121. logx.Error(err)
  122. return err
  123. }
  124. // 更新数据状态
  125. svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
  126. return nil
  127. }
  128. // 推送镜像到nexus仓库
  129. func pushImage(svcCtx *svc.ServiceContext, hash string, name string) error {
  130. // 加载镜像文件到docker
  131. fileInfo, err := os.Open(filepath.Join(uploadPath, name))
  132. if err != nil {
  133. logx.Error(err)
  134. return err
  135. }
  136. defer fileInfo.Close()
  137. reader := bufio.NewReader(fileInfo)
  138. body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false)
  139. if err != nil {
  140. logx.Error(err)
  141. return err
  142. }
  143. bytes, err := ioutil.ReadAll(body.Body)
  144. println(string(bytes))
  145. if err != nil {
  146. logx.Error(err)
  147. return err
  148. }
  149. //time.Sleep(12 * 100 * time.Millisecond)
  150. privateImageName := "registry.cn-hangzhou.aliyuncs.com/jointcloud/pcm:" + name
  151. // 给镜像打上私有仓库的tag
  152. err = svcCtx.DockerClient.ImageTag(context.Background(), name, privateImageName)
  153. if err != nil {
  154. logx.Error(err)
  155. return err
  156. }
  157. // 删除原镜像
  158. _, err = svcCtx.DockerClient.ImageRemove(context.Background(), name, types2.ImageRemoveOptions{})
  159. if err != nil {
  160. logx.Error(err)
  161. return err
  162. }
  163. // 推送镜像到registry
  164. authConfig := types2.AuthConfig{
  165. Username: svcCtx.Config.RegistryConf.Username,
  166. Password: svcCtx.Config.RegistryConf.Password,
  167. }
  168. authConfigBytes, err := json.Marshal(authConfig)
  169. if err != nil {
  170. logx.Error(err)
  171. return err
  172. }
  173. logx.Infof(fmt.Sprintln("传输开始", time.Now()))
  174. authStr := base64.URLEncoding.EncodeToString(authConfigBytes)
  175. pushBody, err := svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr})
  176. pushBytes, _ := ioutil.ReadAll(pushBody)
  177. println(string(pushBytes))
  178. if err != nil {
  179. logx.Error(err)
  180. return err
  181. }
  182. logx.Infof(fmt.Sprintln("传输完成", time.Now()))
  183. // 删除本地镜像 避免存储资源浪费
  184. _, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{})
  185. if err != nil {
  186. logx.Error(err)
  187. return err
  188. }
  189. // 更新数据状态
  190. svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
  191. return nil
  192. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.