|
|
|
@@ -3,6 +3,7 @@ package image |
|
|
|
import ( |
|
|
|
"PCM/adaptor/PCM-CORE/model" |
|
|
|
result2 "PCM/common/result" |
|
|
|
"PCM/common/tool" |
|
|
|
"bufio" |
|
|
|
"context" |
|
|
|
"encoding/base64" |
|
|
|
@@ -10,6 +11,7 @@ import ( |
|
|
|
"github.com/aws/aws-sdk-go/aws" |
|
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager" |
|
|
|
types2 "github.com/docker/docker/api/types" |
|
|
|
"github.com/zeromicro/go-zero/core/logx" |
|
|
|
"io/ioutil" |
|
|
|
"k8s.io/apimachinery/pkg/util/json" |
|
|
|
"net/http" |
|
|
|
@@ -18,6 +20,7 @@ import ( |
|
|
|
"strconv" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"PCM/adaptor/PCM-CORE/api/internal/svc" |
|
|
|
) |
|
|
|
@@ -31,8 +34,9 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
hash := r.PostFormValue("hash") |
|
|
|
name := r.PostFormValue("name") |
|
|
|
dataType := r.PostFormValue("dataType") |
|
|
|
kind := r.PostFormValue("kind") |
|
|
|
// 对比合并请求的文件大小和已上传文件夹大小 |
|
|
|
toSize, _ := getDirSize(filepath.Join(uploadTempPath, hash)) |
|
|
|
toSize, _ := tool.GetDirSize(filepath.Join(uploadTempPath, hash)) |
|
|
|
if size != toSize { |
|
|
|
fmt.Fprintf(w, "文件上传错误") |
|
|
|
} |
|
|
|
@@ -45,11 +49,11 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
filesSort[nameArr[1]] = f.Name() |
|
|
|
} |
|
|
|
saveFile := filepath.Join(uploadPath, name) |
|
|
|
if exists, _ := PathExists(saveFile); exists { |
|
|
|
if exists, _ := tool.PathExists(saveFile); exists { |
|
|
|
os.Remove(saveFile) |
|
|
|
} |
|
|
|
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm) |
|
|
|
defer fs.Close() |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
filesCount := len(files) |
|
|
|
if filesCount != len(filesSort) { |
|
|
|
@@ -62,7 +66,6 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
data, err := ioutil.ReadFile(fileName) |
|
|
|
fmt.Println(err) |
|
|
|
fs.Write(data) |
|
|
|
|
|
|
|
wg.Done() |
|
|
|
} |
|
|
|
wg.Wait() |
|
|
|
@@ -70,65 +73,89 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
|
|
|
|
// 保存到数据库表里 |
|
|
|
svcCtx.DbEngin.Create(&model.File{ |
|
|
|
Name: name, |
|
|
|
Hash: hash, |
|
|
|
Type: dataType, |
|
|
|
Status: "0", |
|
|
|
Bucket: "pcm"}) |
|
|
|
Name: name, |
|
|
|
Hash: hash, |
|
|
|
DataType: dataType, |
|
|
|
Status: "local", |
|
|
|
Kind: kind, |
|
|
|
Bucket: "pcm"}) |
|
|
|
|
|
|
|
// 根据数据类型按需上传(镜像推送到nexus 数据集和算法推送到云际存储) |
|
|
|
switch dataType { |
|
|
|
var err error |
|
|
|
switch kind { |
|
|
|
case "image": |
|
|
|
pushImage(svcCtx, name) |
|
|
|
err = pushImage(svcCtx, hash, name) |
|
|
|
case "dataSet", "algorithm": |
|
|
|
uploadStorage(svcCtx, name) |
|
|
|
} |
|
|
|
|
|
|
|
err := pushImage(svcCtx, name) |
|
|
|
if err != nil { |
|
|
|
result2.HttpResult(r, w, nil, err) |
|
|
|
return |
|
|
|
err = uploadStorage(svcCtx, hash, name) |
|
|
|
} |
|
|
|
// 删除本地文件 避免占用本地存储资源 |
|
|
|
err = os.Remove(filepath.Join(uploadPath, name)) |
|
|
|
defer os.Remove(filepath.Join(uploadPath, name)) |
|
|
|
defer fs.Close() |
|
|
|
result2.HttpResult(r, w, nil, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func uploadStorage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
svcCtx.Uploader.Upload(&s3manager.UploadInput{ |
|
|
|
// 同步数据集到modelArts |
|
|
|
func syncDataSet() { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// 上传文件到云集存储 |
|
|
|
func uploadStorage(svcCtx *svc.ServiceContext, hash string, name string) error { |
|
|
|
fileInfo, err := os.Open(filepath.Join(uploadPath, name)) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
_, err = svcCtx.Uploader.Upload(&s3manager.UploadInput{ |
|
|
|
Bucket: aws.String("pcm"), |
|
|
|
Key: aws.String(name), |
|
|
|
Body: fileInfo, |
|
|
|
}) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 更新数据状态 |
|
|
|
svcCtx.DbEngin.Model(&model.File{}).Where("hash = ?", hash).Update("status", "cloud") |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func pushImage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
// 推送镜像到nexus仓库 |
|
|
|
func pushImage(svcCtx *svc.ServiceContext, hash string, name string) error { |
|
|
|
// 加载镜像文件到docker |
|
|
|
fileInfo, err := os.Open(filepath.Join(uploadPath, name)) |
|
|
|
defer fileInfo.Close() |
|
|
|
reader := bufio.NewReader(fileInfo) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
defer fileInfo.Close() |
|
|
|
reader := bufio.NewReader(fileInfo) |
|
|
|
|
|
|
|
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
bytes, err := ioutil.ReadAll(body.Body) |
|
|
|
|
|
|
|
loadBody := LoadBody{} |
|
|
|
err = json.Unmarshal(bytes, &loadBody) |
|
|
|
println(string(bytes)) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
imageName := strings.TrimSpace(loadBody.Stream[13:]) |
|
|
|
privateImageName := "hub.jcce.dev:18445/" + imageName |
|
|
|
//time.Sleep(12 * 100 * time.Millisecond) |
|
|
|
privateImageName := "hub.jcce.dev:18445/repository/docker-hub/" + name |
|
|
|
// 给镜像打上私有仓库的tag |
|
|
|
err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, privateImageName) |
|
|
|
err = svcCtx.DockerClient.ImageTag(context.Background(), name, privateImageName) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 删除原镜像 |
|
|
|
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), name, types2.ImageRemoveOptions{}) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 推送镜像到registry |
|
|
|
@@ -137,28 +164,27 @@ func pushImage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
Password: "Nudt@123", |
|
|
|
} |
|
|
|
authConfigBytes, err := json.Marshal(authConfig) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
println(fmt.Sprintln("传输开始", time.Now())) |
|
|
|
authStr := base64.URLEncoding.EncodeToString(authConfigBytes) |
|
|
|
_, err = svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr}) |
|
|
|
pushBody, err := svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr}) |
|
|
|
pushBytes, _ := ioutil.ReadAll(pushBody) |
|
|
|
println(string(pushBytes)) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
println("传输完成!") |
|
|
|
logx.Infof(fmt.Sprintln("传输完成", time.Now())) |
|
|
|
// 删除本地镜像 避免存储资源浪费 |
|
|
|
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{}) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 更新数据状态 |
|
|
|
svcCtx.DbEngin.Model(&model.File{}).Where("hash = ?", hash).Update("status", "cloud") |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
// DirSize 获取整体文件夹大小 |
|
|
|
func getDirSize(path string) (int64, error) { |
|
|
|
var size int64 |
|
|
|
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { |
|
|
|
if !info.IsDir() { |
|
|
|
size += info.Size() |
|
|
|
} |
|
|
|
return err |
|
|
|
}) |
|
|
|
return size, err |
|
|
|
} |