|
- /*
-
- Copyright (c) [2023] [pcm]
- [pcm-coordinator] is licensed under Mulan PSL v2.
- You can use this software according to the terms and conditions of the Mulan PSL v2.
- You may obtain a copy of Mulan PSL v2 at:
- http://license.coscl.org.cn/MulanPSL2
- THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
- EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
- MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
- See the Mulan PSL v2 for more details.
-
- */
-
- package storeLink
-
- import (
- "context"
- "errors"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopus"
- "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient"
- "math"
- "strconv"
- "strings"
- )
-
- type OctopusLink struct {
- octopusRpc octopusclient.Octopus
- pageIndex int32
- pageSize int32
- platform string
- participantId int64
- }
-
- const (
- IMG_NAME_PREFIX = "oct_"
- IMG_VERSION_PREFIX = "version_"
- TASK_NAME_PREFIX = "trainJob"
- RESOURCE_POOL = "common-pool"
- HANWUJI = "hanwuji"
- SUIYUAN = "suiyuan"
- SAILINGSI = "sailingsi"
- MLU = "MLU"
- CAMBRICONMLU290 = 256
- GCU = "GCU"
- ENFLAME = "enflame"
- EnflameT20 = 128
- BASE_TOPS = 128
- CAMBRICON = "cambricon"
- TRAIN_CMD = "cd /code; python train.py"
- VERSION = "V1"
- )
-
- var (
- cardAliasMap = map[string]string{
- MLU: CAMBRICON,
- GCU: ENFLAME,
- }
- cardTopsMap = map[string]float64{
- MLU: CAMBRICONMLU290,
- GCU: EnflameT20,
- }
- )
-
- func NewOctopusLink(octopusRpc octopusclient.Octopus, name string, id int64) *OctopusLink {
- return &OctopusLink{octopusRpc: octopusRpc, platform: name, participantId: id, pageIndex: 1, pageSize: 100}
- }
-
- func (o *OctopusLink) UploadImage(ctx context.Context, path string) (interface{}, error) {
- // octopus创建镜像
- createReq := &octopus.CreateImageReq{
- Platform: o.platform,
- CreateImage: &octopus.CreateImage{
- SourceType: 1,
- ImageName: IMG_NAME_PREFIX + utils.RandomString(7),
- ImageVersion: IMG_VERSION_PREFIX + utils.RandomString(7),
- },
- }
- createResp, err := o.octopusRpc.CreateImage(ctx, createReq)
- if err != nil {
- return nil, err
- }
-
- // octopus上传镜像
- uploadReq := &octopus.UploadImageReq{
- Platform: o.platform,
- ImageId: createResp.Payload.ImageId,
- Params: &octopus.UploadImageParam{
- Domain: "",
- FileName: "",
- },
- }
- uploadResp, err := o.octopusRpc.UploadImage(ctx, uploadReq)
- if err != nil {
- return nil, err
- }
-
- // Todo 实际上传
-
- return uploadResp, nil
- }
-
- func (o *OctopusLink) DeleteImage(ctx context.Context, imageId string) (interface{}, error) {
- // octopus删除镜像
- req := &octopus.DeleteImageReq{
- Platform: o.platform,
- ImageId: imageId,
- }
- resp, err := o.octopusRpc.DeleteImage(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) QueryImageList(ctx context.Context) (interface{}, error) {
- // octopus获取镜像列表
- req := &octopus.GetUserImageListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetUserImageList(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) SubmitTask(ctx context.Context, imageId string, cmd string, envs []string, params []string, resourceId string, datasetsId string, algorithmId string, aiType string) (interface{}, error) {
- // octopus提交任务
-
- // python参数
- var prms []*octopus.Parameters
- for _, param := range params {
- var p octopus.Parameters
- s := strings.Split(param, COMMA)
- p.Key = s[0]
- p.Value = s[1]
- prms = append(prms, &p)
- }
-
- //环境变量
- envMap := make(map[string]string)
- for _, env := range envs {
- s := strings.Split(env, COMMA)
- envMap[s[0]] = s[1]
- }
-
- req := &octopus.CreateTrainJobReq{
- Platform: o.platform,
- Params: &octopus.CreateTrainJobParam{
- ImageId: imageId,
- Name: TASK_NAME_PREFIX + UNDERSCORE + utils.RandomString(10),
- ResourcePool: RESOURCE_POOL,
- Config: []*octopus.Config{
- {
- Command: cmd,
- ResourceSpecId: resourceId,
- MinFailedTaskCount: 1,
- MinSucceededTaskCount: 1,
- TaskNumber: 1,
- Parameters: prms,
- Envs: envMap,
- },
- },
- DataSetId: datasetsId,
- DataSetVersion: VERSION,
- AlgorithmId: algorithmId,
- AlgorithmVersion: VERSION,
- },
- }
- resp, err := o.octopusRpc.CreateTrainJob(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) QueryTask(ctx context.Context, taskId string) (interface{}, error) {
- // octopus获取任务
- req := &octopus.GetTrainJobReq{
- Platform: o.platform,
- Id: taskId,
- }
- resp, err := o.octopusRpc.GetTrainJob(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) DeleteTask(ctx context.Context, taskId string) (interface{}, error) {
- // octopus删除任务
- req := &octopus.DeleteTrainJobReq{
- Platform: o.platform,
- JobIds: []string{taskId},
- }
- resp, err := o.octopusRpc.DeleteTrainJob(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) QuerySpecs(ctx context.Context) (interface{}, error) {
- // octopus查询资源规格
- req := &octopus.GetResourceSpecsReq{
- Platform: o.platform,
- ResourcePool: RESOURCE_POOL,
- }
- resp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
- if err != nil {
- return nil, err
- }
-
- return resp, nil
- }
-
- func (o *OctopusLink) GetResourceStats(ctx context.Context) (*collector.ResourceStats, error) {
- req := &octopus.GetResourceSpecsReq{
- Platform: o.platform,
- ResourcePool: RESOURCE_POOL,
- }
- specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
- if err != nil {
- return nil, err
- }
- if !specResp.Success {
- return nil, errors.New(specResp.Error.Message)
- }
- balanceReq := &octopus.GetUserBalanceReq{
- Platform: o.platform,
- }
- balanceResp, err := o.octopusRpc.GetUserBalance(ctx, balanceReq)
- if err != nil {
- return nil, err
- }
- if !balanceResp.Success {
- return nil, errors.New(balanceResp.Error.Message)
- }
-
- var cards []*collector.Card
- balance := float64(balanceResp.Payload.BillingUser.Amount)
- var cpuHours float64
- for _, spec := range specResp.TrainResourceSpecs {
- if spec.Price == 0 {
- ns := strings.Split(spec.Name, COMMA)
- if len(ns) == 2 {
- nss := strings.Split(ns[0], COLON)
- if nss[0] == CPU {
- cpuHours = -1
- }
- }
- }
-
- if spec.Price == 1 {
- ns := strings.Split(spec.Name, COMMA)
- cardSpecs := strings.Split(ns[0], STAR)
-
- cardTops, isMapContainsKey := cardTopsMap[cardSpecs[1]]
- if !isMapContainsKey {
- continue
- }
-
- card := &collector.Card{
- Platform: OCTOPUS,
- Type: CARD,
- Name: cardSpecs[1],
- TOpsAtFp16: cardTops,
- CardHours: balance / spec.Price,
- }
- cards = append(cards, card)
- }
- }
-
- resourceStats := &collector.ResourceStats{
- ParticipantId: o.participantId,
- Name: o.platform,
- Balance: balance,
- CardsAvail: cards,
- CpuCoreHours: cpuHours,
- }
-
- return resourceStats, nil
- }
-
- func (o *OctopusLink) GetDatasetsSpecs(ctx context.Context) ([]*collector.DatasetsSpecs, error) {
- req := &octopus.GetMyDatasetListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
- if err != nil {
- return nil, err
- }
- if !resp.Success {
- return nil, errors.New(resp.Error.Message)
- }
- specs := []*collector.DatasetsSpecs{}
- for _, dataset := range resp.Payload.Datasets {
- spec := &collector.DatasetsSpecs{Name: dataset.Name}
- specs = append(specs, spec)
- }
- return specs, nil
- }
-
- func (o *OctopusLink) GetAlgorithms(ctx context.Context) ([]*collector.Algorithm, error) {
- var algorithms []*collector.Algorithm
-
- req := &octopus.GetMyAlgorithmListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
- if err != nil {
- return nil, err
- }
- if !resp.Success {
- return nil, errors.New("failed to get algorithms")
- }
-
- for _, a := range resp.Payload.Algorithms {
- algorithm := &collector.Algorithm{Name: a.AlgorithmName, Platform: OCTOPUS, TaskType: strings.ToLower(a.FrameworkName)}
- algorithms = append(algorithms, algorithm)
- }
- return algorithms, nil
- }
-
- func (o *OctopusLink) Execute(ctx context.Context, option *option.AiOption) (interface{}, error) {
- err := o.GenerateSubmitParams(ctx, option)
- if err != nil {
- return nil, err
- }
- task, err := o.SubmitTask(ctx, option.ImageId, option.Cmd, option.Envs, option.Params, option.ResourceId, option.DatasetsId, option.AlgorithmId, option.TaskType)
- if err != nil {
- return nil, err
- }
- return task, nil
- }
-
- func (o *OctopusLink) GenerateSubmitParams(ctx context.Context, option *option.AiOption) error {
- err := o.generateResourceId(ctx, option)
- if err != nil {
- return err
- }
- err = o.generateDatasetsId(ctx, option)
- if err != nil {
- return err
- }
- err = o.generateImageId(ctx, option)
- if err != nil {
- return err
- }
- err = o.generateAlgorithmId(ctx, option)
- if err != nil {
- return err
- }
- err = o.generateCmd(option)
- if err != nil {
- return err
- }
- err = o.generateEnv(option)
- if err != nil {
- return err
- }
- err = o.generateParams(option)
- if err != nil {
- return err
- }
- return nil
- }
-
- func (o *OctopusLink) generateResourceId(ctx context.Context, option *option.AiOption) error {
- if option.ResourceType == "" {
- return errors.New("ResourceType not set")
- }
- req := &octopus.GetResourceSpecsReq{
- Platform: o.platform,
- ResourcePool: RESOURCE_POOL,
- }
- specResp, err := o.octopusRpc.GetResourceSpecs(ctx, req)
- if err != nil {
- return err
- }
- if !specResp.Success {
- return errors.New(specResp.Error.Message)
- }
-
- if option.ResourceType == CPU {
- for _, spec := range specResp.TrainResourceSpecs {
- if spec.Price == 0 {
- option.ResourceId = spec.Id
- return nil
- }
- }
- }
-
- if option.ResourceType == CARD {
- err = setResourceIdByCard(option, specResp, GCU)
- if err != nil {
- return err
- }
- return nil
- }
-
- return errors.New("failed to get ResourceId")
- }
-
- func (o *OctopusLink) generateDatasetsId(ctx context.Context, option *option.AiOption) error {
- if option.DatasetsName == "" {
- return errors.New("DatasetsName not set")
- }
- req := &octopus.GetMyDatasetListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetMyDatasetList(ctx, req)
- if err != nil {
- return err
- }
- if !resp.Success {
- return errors.New("failed to get DatasetsId")
- }
- for _, dataset := range resp.Payload.Datasets {
- if dataset.Name == option.DatasetsName {
- option.DatasetsId = dataset.Id
- return nil
- }
- }
- return errors.New("failed to get DatasetsId")
- }
-
- func (o *OctopusLink) generateImageId(ctx context.Context, option *option.AiOption) error {
- if option.TaskType == "" {
- return errors.New("TaskType not set")
- }
-
- req := &octopus.GetUserImageListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetUserImageList(ctx, req)
- if err != nil {
- return err
- }
- if !resp.Success {
- return errors.New("failed to get imageId")
- }
-
- if option.ResourceType == CPU {
- for _, img := range resp.Payload.Images {
- if img.Image.ImageName == "test-image" {
- option.ImageId = img.Image.Id
- return nil
- }
- }
- }
-
- preImgReq := &octopus.GetPresetImageListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- preImgResp, err := o.octopusRpc.GetPresetImageList(ctx, preImgReq)
- if err != nil {
- return err
- }
- if !preImgResp.Success {
- return errors.New("failed to get PresetImages")
- }
-
- if option.ResourceType == CARD {
- for _, image := range preImgResp.Payload.Images {
- if strings.Contains(image.ImageName, cardAliasMap[option.ComputeCard]) {
- option.ImageId = image.Id
- return nil
- }
- }
- }
-
- return errors.New("failed to get ImageId")
- }
-
- func (o *OctopusLink) generateAlgorithmId(ctx context.Context, option *option.AiOption) error {
- // temporarily set algorithm to cnn
- if option.AlgorithmName == "" {
- switch option.DatasetsName {
- case "cifar10":
- option.AlgorithmName = "cnn"
- case "mnist":
- option.AlgorithmName = "fcn"
- }
- }
-
- req := &octopus.GetMyAlgorithmListReq{
- Platform: o.platform,
- PageIndex: o.pageIndex,
- PageSize: o.pageSize,
- }
- resp, err := o.octopusRpc.GetMyAlgorithmList(ctx, req)
- if err != nil {
- return err
- }
- if !resp.Success {
- return errors.New("failed to get algorithmId")
- }
-
- for _, algorithm := range resp.Payload.Algorithms {
- if algorithm.FrameworkName == strings.Title(option.TaskType) {
- ns := strings.Split(algorithm.AlgorithmName, UNDERSCORE)
- if ns[0] != option.DatasetsName {
- continue
- }
- if ns[1] != option.AlgorithmName {
- continue
- }
- switch option.ResourceType {
- case CPU:
- if ns[2] != CPU {
- continue
- }
- case CARD:
- if ns[2] != strings.ToLower(option.ComputeCard) {
- continue
- }
- }
-
- option.AlgorithmId = algorithm.AlgorithmId
- return nil
- }
- }
-
- if option.AlgorithmId == "" {
- return errors.New("Algorithm does not exist")
- }
-
- return errors.New("failed to get AlgorithmId")
- }
-
- func (o *OctopusLink) generateCmd(option *option.AiOption) error {
- if option.Cmd == "" {
- switch option.ComputeCard {
- case GCU:
- option.Cmd = "cd /code; python3 train.py"
- default:
- option.Cmd = TRAIN_CMD
- }
- }
-
- return nil
- }
-
- func (o *OctopusLink) generateEnv(option *option.AiOption) error {
-
- return nil
- }
-
- func (o *OctopusLink) generateParams(option *option.AiOption) error {
- if len(option.Params) == 0 {
- epoch := "epoch" + COMMA + "1"
- option.Params = append(option.Params, epoch)
- }
- return nil
- }
-
- func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpecsResp, computeCard string) error {
- if option.Tops == 0 {
- for _, spec := range specs.TrainResourceSpecs {
- if spec.Price == 1 {
- ns := strings.Split(spec.Name, COMMA)
- cardSpecs := strings.Split(ns[0], STAR)
- if cardSpecs[1] == computeCard {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- } else {
- continue
- }
- }
- } else {
- cardNum := math.Ceil(option.Tops / float64(BASE_TOPS))
- for _, spec := range specs.TrainResourceSpecs {
- if option.Tops < BASE_TOPS {
- if spec.Price == 1 {
- ns := strings.Split(spec.Name, COMMA)
- cardSpecs := strings.Split(ns[0], STAR)
- if cardSpecs[1] == computeCard {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- } else {
- continue
- }
- } else {
- ns := strings.Split(spec.Name, COMMA)
- if len(ns) != 4 {
- continue
- }
- cardSpecs := strings.Split(ns[0], STAR)
- if cardSpecs[1] != computeCard {
- continue
- }
- s, err := strconv.ParseFloat(cardSpecs[0], 64)
- if err != nil {
- return err
- }
- switch computeCard {
- case GCU:
- if cardNum == s { // 1, 4, 8
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- if 1 < cardNum && cardNum <= 4 && s == 4 {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- if 4 < cardNum && s == 8 {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
-
- case MLU: // 1, 2, 4
- if cardNum/2 == s {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- if 1 < cardNum/2 && cardNum/2 <= 2 && s == 2 {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- if 2 < cardNum/2 && s == 4 {
- option.ResourceId = spec.Id
- option.ComputeCard = computeCard
- return nil
- }
- }
- }
- }
- }
- return errors.New("set ResourceId error")
- }
|