You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

chunkhandler.go 5.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package image
  2. import (
  3. "bufio"
  4. "context"
  5. "encoding/base64"
  6. "fmt"
  7. "github.com/aws/aws-sdk-go/aws"
  8. "github.com/aws/aws-sdk-go/service/s3/s3manager"
  9. types2 "github.com/docker/docker/api/types"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
  12. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
  13. "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils/fileutils"
  14. "io/ioutil"
  15. "k8s.io/apimachinery/pkg/util/json"
  16. "net/http"
  17. "os"
  18. "path/filepath"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
  24. )
  25. var dir, _ = os.Getwd()
  26. var uploadPath = filepath.Join(dir, "uploads")
  27. func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
  28. return func(w http.ResponseWriter, r *http.Request) {
  29. size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
  30. hash := r.PostFormValue("hash")
  31. name := r.PostFormValue("name")
  32. dataType := r.PostFormValue("dataType")
  33. kind := r.PostFormValue("kind")
  34. // 对比合并请求的文件大小和已上传文件夹大小
  35. toSize, err := fileutils.GetDirSize(filepath.Join(uploadTempPath, hash))
  36. if err != nil {
  37. logx.Error(err)
  38. result.HttpResult(r, w, nil, err)
  39. return
  40. }
  41. if size != toSize {
  42. fmt.Fprintf(w, "文件上传错误")
  43. }
  44. chunksPath := filepath.Join(uploadTempPath, hash)
  45. files, _ := ioutil.ReadDir(chunksPath)
  46. // 将文件根据索引序号排序
  47. filesSort := make(map[string]string)
  48. for _, f := range files {
  49. nameArr := strings.Split(f.Name(), "-")
  50. filesSort[nameArr[1]] = f.Name()
  51. }
  52. saveFile := filepath.Join(uploadPath, name)
  53. if exists, _ := fileutils.PathExists(saveFile); exists {
  54. os.Remove(saveFile)
  55. }
  56. fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
  57. var wg sync.WaitGroup
  58. filesCount := len(files)
  59. if filesCount != len(filesSort) {
  60. fmt.Fprintf(w, "文件上传错误2")
  61. }
  62. wg.Add(filesCount)
  63. for i := 0; i < filesCount; i++ {
  64. // 这里一定要注意按顺序读取不然文件就会损坏
  65. fileName := filepath.Join(chunksPath, filesSort[strconv.Itoa(i)])
  66. data, err := ioutil.ReadFile(fileName)
  67. fmt.Println(err)
  68. fs.Write(data)
  69. wg.Done()
  70. }
  71. wg.Wait()
  72. os.RemoveAll(chunksPath)
  73. // 保存到数据库表里
  74. svcCtx.DbEngin.Create(&models.File{
  75. Name: name,
  76. Hash: hash,
  77. DataType: dataType,
  78. Status: "local",
  79. Kind: kind,
  80. Bucket: "pcm"})
  81. // 根据数据类型按需上传(镜像推送到nexus 数据集和算法推送到云际存储)
  82. switch kind {
  83. case "image":
  84. err = pushImage(svcCtx, hash, name)
  85. case "dataSet", "algorithm":
  86. err = uploadStorage(svcCtx, hash, name)
  87. }
  88. // 删除本地文件 避免占用本地存储资源
  89. defer os.Remove(filepath.Join(uploadPath, name))
  90. defer fs.Close()
  91. result.HttpResult(r, w, nil, err)
  92. }
  93. }
  94. // 同步数据集到modelArts
  95. func syncDataSet() {
  96. }
  97. // 上传文件到云集存储
  98. func uploadStorage(svcCtx *svc.ServiceContext, hash string, name string) error {
  99. fileInfo, err := os.Open(filepath.Join(uploadPath, name))
  100. if err != nil {
  101. logx.Error(err)
  102. return err
  103. }
  104. _, err = svcCtx.Uploader.Upload(&s3manager.UploadInput{
  105. Bucket: aws.String("pcm"),
  106. Key: aws.String(name),
  107. Body: fileInfo,
  108. })
  109. if err != nil {
  110. logx.Error(err)
  111. return err
  112. }
  113. // 更新数据状态
  114. svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
  115. return nil
  116. }
  117. // 推送镜像到nexus仓库
  118. func pushImage(svcCtx *svc.ServiceContext, hash string, name string) error {
  119. // 加载镜像文件到docker
  120. fileInfo, err := os.Open(filepath.Join(uploadPath, name))
  121. if err != nil {
  122. logx.Error(err)
  123. return err
  124. }
  125. defer fileInfo.Close()
  126. reader := bufio.NewReader(fileInfo)
  127. body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false)
  128. if err != nil {
  129. logx.Error(err)
  130. return err
  131. }
  132. bytes, err := ioutil.ReadAll(body.Body)
  133. println(string(bytes))
  134. if err != nil {
  135. logx.Error(err)
  136. return err
  137. }
  138. //time.Sleep(12 * 100 * time.Millisecond)
  139. privateImageName := "registry.cn-hangzhou.aliyuncs.com/jointcloud/pcm:" + name
  140. // 给镜像打上私有仓库的tag
  141. err = svcCtx.DockerClient.ImageTag(context.Background(), name, privateImageName)
  142. if err != nil {
  143. logx.Error(err)
  144. return err
  145. }
  146. // 删除原镜像
  147. _, err = svcCtx.DockerClient.ImageRemove(context.Background(), name, types2.ImageRemoveOptions{})
  148. if err != nil {
  149. logx.Error(err)
  150. return err
  151. }
  152. // 推送镜像到registry
  153. authConfig := types2.AuthConfig{
  154. Username: svcCtx.Config.RegistryConf.Username,
  155. Password: svcCtx.Config.RegistryConf.Password,
  156. }
  157. authConfigBytes, err := json.Marshal(authConfig)
  158. if err != nil {
  159. logx.Error(err)
  160. return err
  161. }
  162. logx.Infof(fmt.Sprintln("传输开始", time.Now()))
  163. authStr := base64.URLEncoding.EncodeToString(authConfigBytes)
  164. pushBody, err := svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr})
  165. pushBytes, _ := ioutil.ReadAll(pushBody)
  166. println(string(pushBytes))
  167. if err != nil {
  168. logx.Error(err)
  169. return err
  170. }
  171. logx.Infof(fmt.Sprintln("传输完成", time.Now()))
  172. // 删除本地镜像 避免存储资源浪费
  173. _, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{})
  174. if err != nil {
  175. logx.Error(err)
  176. return err
  177. }
  178. // 更新数据状态
  179. svcCtx.DbEngin.Model(&models.File{}).Where("hash = ?", hash).Update("status", "cloud")
  180. return nil
  181. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.