Browse Source

Merge remote-tracking branch 'origin/master'

pull/439/head
tzwang 8 months ago
parent
commit
aefb1513f1
16 changed files with 134 additions and 22 deletions
  1. +1
    -1
      desc/core/pcm-core.api
  2. +1
    -1
      etc/pcm.yaml
  3. +1
    -1
      internal/config/config.go
  4. +4
    -1
      internal/logic/adapters/createclusterlogic.go
  5. +22
    -0
      internal/logic/adapters/updateclusterlogic.go
  6. +10
    -0
      internal/logic/cloud/commitgeneraltasklogic.go
  7. +6
    -0
      internal/logic/hpc/commithpctasklogic.go
  8. +8
    -8
      internal/scheduler/schedulers/aiScheduler.go
  9. +1
    -1
      internal/scheduler/service/aiService.go
  10. +3
    -5
      internal/scheduler/service/utils/jcs/middleware.go
  11. +29
    -2
      internal/scheduler/service/utils/status/taskStatusSync.go
  12. +34
    -1
      internal/storeLink/openi.go
  13. +1
    -1
      internal/types/types.go
  14. +1
    -0
      pkg/models/taskmodel_gen.go
  15. +1
    -0
      pkg/utils/remoteUtil/BlockChain.go
  16. +11
    -0
      pkg/utils/string.go

+ 1
- 1
desc/core/pcm-core.api View File

@@ -1372,7 +1372,7 @@ type (
}
)

type ResourcePrice {
type ResourceCost {
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
ResourceID int64 `json:"resourceId" gorm:"column:resource_id"`
Price int `json:"price" gorm:"column:price"`


+ 1
- 1
etc/pcm.yaml View File

@@ -84,4 +84,4 @@ BlockChain:
Type: "2"

JcsMiddleware:
Url: 101.201.215.196:7891
JobStatusReportUrl: 101.201.215.196:7891/jobSet/jobStatusReport

+ 1
- 1
internal/config/config.go View File

@@ -65,5 +65,5 @@ type SnowflakeConf struct {
}

type JcsMiddleware struct {
Url string
JobStatusReportUrl string
}

+ 4
- 1
internal/logic/adapters/createclusterlogic.go View File

@@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/util/json"
"net/http"
"net/url"
"strconv"
"time"

"github.com/zeromicro/go-zero/core/logx"
@@ -65,7 +66,9 @@ func (l *CreateClusterLogic) CreateCluster(req *types.ClusterCreateReq) (resp *t
return nil, errors.New("cluster create failed")
}
// 创建资源价格信息
resourcePrice := &types.ResourcePrice{
clusterId, _ := strconv.ParseInt(cluster.Id, 10, 64)
resourcePrice := &types.ResourceCost{
ResourceID: clusterId,
Price: req.Price,
ResourceType: constants.CLUSTER,
CostType: req.CostType,


+ 22
- 0
internal/logic/adapters/updateclusterlogic.go View File

@@ -3,8 +3,11 @@ package adapters
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"strconv"

"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@@ -40,5 +43,24 @@ func (l *UpdateClusterLogic) UpdateCluster(req *types.ClusterCreateReq) (resp *t
}
cluster.Location = location
l.svcCtx.DbEngin.Table("t_cluster").Model(&cluster).Updates(&cluster)
// 更新资源价格表
clusterId, err := strconv.ParseInt(req.Id, 10, 64)
if err != nil {
return nil, err
}
resourceCost := &types.ResourceCost{
ResourceID: clusterId,
Price: req.Price,
ResourceType: constants.CLUSTER,
CostType: req.CostType,
}
dbResult := l.svcCtx.DbEngin.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "resource_id"}},
DoUpdates: clause.AssignmentColumns([]string{"price", "cost_type"}),
}).Create(&resourceCost)

if dbResult.Error != nil {
panic(dbResult.Error)
}
return
}

+ 10
- 0
internal/logic/cloud/commitgeneraltasklogic.go View File

@@ -99,6 +99,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
AdapterTypeDict: "0",
SynergyStatus: synergyStatus,
Strategy: strategy,
UserId: req.UserId,
}
var taskClouds []cloud.TaskCloudModel
adapterName := ""
@@ -144,6 +145,14 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
if err != nil {
return err
}
// 查询资源价格
var price int64
for _, clusterId := range req.ClusterIds {
var clusterPrice int64
l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&clusterPrice)
price = price + clusterPrice
}

remoteUtil.Evidence(remoteUtil.EvidenceParam{
UserIp: req.UserIp,
Url: l.svcCtx.Config.BlockChain.Url,
@@ -151,6 +160,7 @@ func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) er
FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
Type: l.svcCtx.Config.BlockChain.Type,
Token: req.Token,
Amount: price,
Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
})
return nil


+ 6
- 0
internal/logic/hpc/commithpctasklogic.go View File

@@ -48,6 +48,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
CommitTime: time.Now(),
Status: "Running",
AdapterTypeDict: "2",
UserId: req.UserId,
}

// 保存任务数据到数据库
@@ -112,6 +113,10 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
JobId: string(""),
}
// 数据上链
// 查询资源价格
var price int64
l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&price)

bytes, _ := json.Marshal(taskModel)
remoteUtil.Evidence(remoteUtil.EvidenceParam{
UserIp: req.UserIp,
@@ -120,6 +125,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
Type: l.svcCtx.Config.BlockChain.Type,
Token: req.Token,
Amount: price,
Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
})
// 提交job到指定集群


+ 8
- 8
internal/scheduler/schedulers/aiScheduler.go View File

@@ -259,9 +259,9 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass

//report msg
report := &jcs.JobStatusReportReq{
JobSetID: "",
LocalJobID: "",
Messages: make([]*jcs.ReportMessage, 0),
TaskName: "",
TaskID: strconv.FormatInt(taskId, 10),
Messages: make([]*jcs.ReportMessage, 0),
}

var errmsg string
@@ -282,11 +282,10 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass

//add report msg
jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false,
Message: msg,
ClusterID: e.clusterId,
Output: "",
}
report.Messages = append(report.Messages, jobMsg)
}
@@ -311,17 +310,18 @@ func (as *AiScheduler) handleErrors(errs []interface{}, clusters []*strategy.Ass
}
//add report msg
jobMsg := &jcs.ReportMessage{
TaskName: as.option.TaskName,
TaskID: strconv.FormatInt(as.option.TaskId, 10),
Status: false,
Message: s.Msg,
ClusterID: s.ClusterId,
Output: "",
}
report.Messages = append(report.Messages, jobMsg)
}

//report status
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.Url, report)
if mode == executor.SUBMIT_MODE_STORAGE_SCHEDULE {
_ = jcs.StatusReport(as.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)
}

logx.Errorf(errors.New(errmsg).Error())
return errors.New(errmsg)


+ 1
- 1
internal/scheduler/service/aiService.go View File

@@ -93,7 +93,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
inferenceMap[c.Id] = sgai
case OPENI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token)
openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token)
collectorMap[c.Id] = openi
executorMap[c.Id] = openi
inferenceMap[c.Id] = openi


+ 3
- 5
internal/scheduler/service/utils/jcs/middleware.go View File

@@ -5,13 +5,11 @@ import (
)

type JobStatusReportReq struct {
JobSetID string `json:"jobSetID"`
LocalJobID string `json:"localJobID"`
Messages []*ReportMessage `json:"messages"`
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Messages []*ReportMessage `json:"messages"`
}
type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID string `json:"clusterID"`


+ 29
- 2
internal/scheduler/service/utils/status/taskStatusSync.go View File

@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/jcs"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
@@ -59,11 +60,17 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
}

if len(aiTask) == 1 {
if aiTask[0].Status == constants.Completed {
switch aiTask[0].Status {
case constants.Completed:
task.Status = constants.Succeeded
} else {
_ = reportStatusMessages(svc, task, aiTask[0])
case constants.Failed:
task.Status = constants.Failed
_ = reportStatusMessages(svc, task, aiTask[0])
default:
task.Status = aiTask[0].Status
}

task.StartTime = aiTask[0].StartTime
task.EndTime = aiTask[0].EndTime
err := svc.Scheduler.AiStorages.UpdateTask(task)
@@ -142,6 +149,26 @@ func UpdateTaskStatus(svc *svc.ServiceContext, tasklist []*types.TaskModel) {
}
}

func reportStatusMessages(svc *svc.ServiceContext, task *types.TaskModel, aiTask *models.TaskAi) error {
report := &jcs.JobStatusReportReq{
TaskName: task.Name,
TaskID: strconv.FormatInt(task.Id, 10),
Messages: make([]*jcs.ReportMessage, 0),
}
//add report msg
jobMsg := &jcs.ReportMessage{
Status: true,
Message: "",
ClusterID: strconv.FormatInt(aiTask.ClusterId, 10),
Output: aiTask.JobId,
}
report.Messages = append(report.Messages, jobMsg)

_ = jcs.StatusReport(svc.Scheduler.AiService.Conf.JcsMiddleware.JobStatusReportUrl, report)

return nil
}

func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
aiTask, err := svc.Scheduler.AiStorages.GetAiTaskListById(task.Id)
if err != nil {


+ 34
- 1
internal/storeLink/openi.go View File

@@ -16,6 +16,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-openi/model"
"mime/multipart"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
@@ -202,6 +203,12 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs

//params := "{\"parameter\":[{\"label\":\"a\",\"value\":\"1\"},{\"label\":\"b\",\"value\":\"2\"}]}"

// choose imageId and imageUrl
imgId, imgUrl, err := swapImageIdAndImageUrl(imageId)
if err != nil {
return nil, err
}

taskParam := &model.CreateTaskParam{
Description: algorithmId, // temporarily set reponame contained in the algorithmId to desc for missing taskdetail's reponame
JobType: TRAIN,
@@ -210,7 +217,8 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs
ComputeSource: computeSource,
SpecId: int(specId),
BranchName: branchName,
ImageId: imageId,
ImageId: imgId,
ImageUrl: imgUrl,
DatasetUuidStr: datasetsId,
BootFile: bootFile,
HasInternet: 2, // 0 不限制;1 不需要互联网;2 需要互联网
@@ -251,6 +259,29 @@ func (o *OpenI) SubmitTask(ctx context.Context, imageId string, cmd string, envs
return resp.Data, nil
}

func swapImageIdAndImageUrl(imageId string) (string, string, error) {
if imageId == "" {
return "", "", errors.New("imageId is empty")
}
var imgId string
var imgUrl string

parsedURL, err := url.Parse("http://" + imageId)
if err != nil {
return "", "", err
}

if utils.IsValidHostAddress(parsedURL.Host) {
imgId = ""
imgUrl = imageId
} else {
imgId = imageId
imgUrl = ""
}

return imgId, imgUrl, nil
}

func (o OpenI) Stop(ctx context.Context, id string) error {
task, err := o.getTrainingTask(ctx, id)
if err != nil {
@@ -454,6 +485,8 @@ func (o OpenI) GetTrainingTask(ctx context.Context, taskId string) (*collector.T
resp.Status = constants.Completed
case "FAILED":
resp.Status = constants.Failed
case "CREATED_FAILED":
resp.Status = constants.Failed
case "RUNNING":
resp.Status = constants.Running
case "STOPPED":


+ 1
- 1
internal/types/types.go View File

@@ -1287,7 +1287,7 @@ type CommonResp struct {
Data interface{} `json:"data,omitempty"`
}

type ResourcePrice struct {
type ResourceCost struct {
ID int64 `json:"id" gorm:"column:id;primaryKey;autoIncrement"`
ResourceID int64 `json:"resourceId" gorm:"column:resource_id"`
Price int `json:"price" gorm:"column:price"`


+ 1
- 0
pkg/models/taskmodel_gen.go View File

@@ -51,6 +51,7 @@ type (
NsID string `db:"ns_id" json:"nsID"`
AdapterTypeDict string `db:"adapter_type_dict" json:"adapterTypeDict"` //任务类型(对应字典表的值)
TaskTypeDict string `db:"task_type_dict" json:"taskTypeDict"`
UserId int64 `db:"user_id" json:"userId"`
}
)



+ 1
- 0
pkg/utils/remoteUtil/BlockChain.go View File

@@ -14,6 +14,7 @@ type EvidenceParam struct {
Args []string `json:"args"`
Token string `json:"token"`
UserIp string `json:"userIp"`
Amount int64 `json:"amount"`
}

func Evidence(EvidenceParam EvidenceParam) error {


+ 11
- 0
pkg/utils/string.go View File

@@ -16,6 +16,7 @@ package utils

import (
"math/rand"
"regexp"
"strings"
"time"
)
@@ -37,3 +38,13 @@ func RandomString(n int) string {
func TimeString() string {
return time.Now().Format(TIMEFORMAT)
}

func IsValidHostAddress(input string) bool {
//pattern := `^(([a-zA-Z0-9]+\.)*[a-zA-Z0-9]+\.[a-zA-Z]{2,}|(\d{1,3}\.){3}\d{1,3}):\d{1,5}$`
pattern := `^((([a-zA-Z0-9]+\.)*[a-zA-Z0-9]+\.[a-zA-Z]{2,}|(\d{1,3}\.){3}\d{1,3}):\d{1,5})$|^.*\.com$`
re, err := regexp.Compile(pattern)
if err != nil {
return false
}
return re.MatchString(input)
}

Loading…
Cancel
Save