|
|
|
@@ -11,6 +11,7 @@ import ( |
|
|
|
"github.com/aws/aws-sdk-go/aws" |
|
|
|
"github.com/aws/aws-sdk-go/service/s3/s3manager" |
|
|
|
types2 "github.com/docker/docker/api/types" |
|
|
|
"github.com/zeromicro/go-zero/core/logx" |
|
|
|
"io/ioutil" |
|
|
|
"k8s.io/apimachinery/pkg/util/json" |
|
|
|
"net/http" |
|
|
|
@@ -19,6 +20,7 @@ import ( |
|
|
|
"strconv" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"time" |
|
|
|
|
|
|
|
"PCM/adaptor/PCM-CORE/api/internal/svc" |
|
|
|
) |
|
|
|
@@ -32,6 +34,7 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
hash := r.PostFormValue("hash") |
|
|
|
name := r.PostFormValue("name") |
|
|
|
dataType := r.PostFormValue("dataType") |
|
|
|
kind := r.PostFormValue("kind") |
|
|
|
// 对比合并请求的文件大小和已上传文件夹大小 |
|
|
|
toSize, _ := tool.GetDirSize(filepath.Join(uploadTempPath, hash)) |
|
|
|
if size != toSize { |
|
|
|
@@ -50,7 +53,7 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
os.Remove(saveFile) |
|
|
|
} |
|
|
|
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm) |
|
|
|
defer fs.Close() |
|
|
|
|
|
|
|
var wg sync.WaitGroup |
|
|
|
filesCount := len(files) |
|
|
|
if filesCount != len(filesSort) { |
|
|
|
@@ -70,73 +73,89 @@ func ChunkHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { |
|
|
|
|
|
|
|
// 保存到数据库表里 |
|
|
|
svcCtx.DbEngin.Create(&model.File{ |
|
|
|
Name: name, |
|
|
|
Hash: hash, |
|
|
|
Type: dataType, |
|
|
|
Status: "0", |
|
|
|
Bucket: "pcm"}) |
|
|
|
Name: name, |
|
|
|
Hash: hash, |
|
|
|
DataType: dataType, |
|
|
|
Status: "local", |
|
|
|
Kind: kind, |
|
|
|
Bucket: "pcm"}) |
|
|
|
|
|
|
|
// 根据数据类型按需上传(镜像推送到nexus 数据集和算法推送到云际存储) |
|
|
|
switch dataType { |
|
|
|
var err error |
|
|
|
switch kind { |
|
|
|
case "image": |
|
|
|
pushImage(svcCtx, name) |
|
|
|
err = pushImage(svcCtx, hash, name) |
|
|
|
case "dataSet", "algorithm": |
|
|
|
uploadStorage(svcCtx, name) |
|
|
|
} |
|
|
|
|
|
|
|
err := pushImage(svcCtx, name) |
|
|
|
if err != nil { |
|
|
|
result2.HttpResult(r, w, nil, err) |
|
|
|
return |
|
|
|
err = uploadStorage(svcCtx, hash, name) |
|
|
|
} |
|
|
|
// 删除本地文件 避免占用本地存储资源 |
|
|
|
go os.Remove(filepath.Join(uploadPath, name)) |
|
|
|
defer os.Remove(filepath.Join(uploadPath, name)) |
|
|
|
defer fs.Close() |
|
|
|
result2.HttpResult(r, w, nil, err) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func uploadStorage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
// 同步数据集到modelArts |
|
|
|
func syncDataSet() { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
// 上传文件到云集存储 |
|
|
|
func uploadStorage(svcCtx *svc.ServiceContext, hash string, name string) error { |
|
|
|
fileInfo, err := os.Open(filepath.Join(uploadPath, name)) |
|
|
|
upload, err := svcCtx.Uploader.Upload(&s3manager.UploadInput{ |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
_, err = svcCtx.Uploader.Upload(&s3manager.UploadInput{ |
|
|
|
Bucket: aws.String("pcm"), |
|
|
|
Key: aws.String(name), |
|
|
|
Body: fileInfo, |
|
|
|
}) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
println(upload.UploadID) |
|
|
|
// 更新数据状态 |
|
|
|
svcCtx.DbEngin.Model(&model.File{}).Where("hash = ?", hash).Update("status", "cloud") |
|
|
|
return nil |
|
|
|
} |
|
|
|
|
|
|
|
func pushImage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
// 推送镜像到nexus仓库 |
|
|
|
func pushImage(svcCtx *svc.ServiceContext, hash string, name string) error { |
|
|
|
// 加载镜像文件到docker |
|
|
|
fileInfo, err := os.Open(filepath.Join(uploadPath, name)) |
|
|
|
defer fileInfo.Close() |
|
|
|
reader := bufio.NewReader(fileInfo) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
defer fileInfo.Close() |
|
|
|
reader := bufio.NewReader(fileInfo) |
|
|
|
|
|
|
|
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), reader, false) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
bytes, err := ioutil.ReadAll(body.Body) |
|
|
|
|
|
|
|
loadBody := LoadBody{} |
|
|
|
err = json.Unmarshal(bytes, &loadBody) |
|
|
|
println(string(bytes)) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
|
|
|
|
imageName := strings.TrimSpace(loadBody.Stream[13:]) |
|
|
|
privateImageName := "hub.jcce.dev:18445/" + imageName |
|
|
|
//time.Sleep(12 * 100 * time.Millisecond) |
|
|
|
privateImageName := "hub.jcce.dev:18445/repository/docker-hub/" + name |
|
|
|
// 给镜像打上私有仓库的tag |
|
|
|
err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, privateImageName) |
|
|
|
err = svcCtx.DockerClient.ImageTag(context.Background(), name, privateImageName) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 删除原镜像 |
|
|
|
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), name, types2.ImageRemoveOptions{}) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 推送镜像到registry |
|
|
|
@@ -145,16 +164,27 @@ func pushImage(svcCtx *svc.ServiceContext, name string) error { |
|
|
|
Password: "Nudt@123", |
|
|
|
} |
|
|
|
authConfigBytes, err := json.Marshal(authConfig) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
println(fmt.Sprintln("传输开始", time.Now())) |
|
|
|
authStr := base64.URLEncoding.EncodeToString(authConfigBytes) |
|
|
|
_, err = svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr}) |
|
|
|
pushBody, err := svcCtx.DockerClient.ImagePush(context.Background(), privateImageName, types2.ImagePushOptions{RegistryAuth: authStr}) |
|
|
|
pushBytes, _ := ioutil.ReadAll(pushBody) |
|
|
|
println(string(pushBytes)) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
println("传输完成!") |
|
|
|
logx.Infof(fmt.Sprintln("传输完成", time.Now())) |
|
|
|
// 删除本地镜像 避免存储资源浪费 |
|
|
|
_, err = svcCtx.DockerClient.ImageRemove(context.Background(), privateImageName, types2.ImageRemoveOptions{}) |
|
|
|
if err != nil { |
|
|
|
logx.Error(err) |
|
|
|
return err |
|
|
|
} |
|
|
|
// 更新数据状态 |
|
|
|
svcCtx.DbEngin.Model(&model.File{}).Where("hash = ?", hash).Update("status", "cloud") |
|
|
|
return nil |
|
|
|
} |