|
- package inference
-
- import (
- "context"
- "errors"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/inference"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
- "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "net/http"
- )
-
- type ImageInferenceLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- }
-
- func NewImageInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ImageInferenceLogic {
- return &ImageInferenceLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx,
- }
- }
-
- //
- //func (l *ImageInferenceLogic) ImageInference(req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) {
- // return nil, nil
- //}
-
- func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) {
- resp = &types.ImageInferenceResp{}
- opt := &option.InferOption{
- TaskName: req.TaskName,
- TaskDesc: req.TaskDesc,
- AdapterId: req.AdapterId,
- AiClusterIds: req.AiClusterIds,
- ModelName: req.ModelName,
- ModelType: req.ModelType,
- Strategy: req.Strategy,
- StaticWeightMap: req.StaticWeightMap,
- }
-
- var ts []*inference.ImageFile
-
- uploadedFiles := r.MultipartForm.File
-
- if len(uploadedFiles) == 0 {
- return nil, errors.New("Images does not exist")
- }
-
- if len(uploadedFiles["images"]) == 0 {
- return nil, errors.New("Images does not exist")
- }
-
- for _, header := range uploadedFiles["images"] {
- file, err := header.Open()
- if err != nil {
- return nil, err
- }
- defer file.Close()
- var ir types.ImageResult
- ir.ImageName = header.Filename
- t := inference.ImageFile{
- ImageResult: &ir,
- File: file,
- }
- ts = append(ts, &t)
- }
-
- _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId]
- if !ok {
- return nil, errors.New("AdapterId does not exist")
- }
-
- var strat strategy.Strategy
- switch opt.Strategy {
- case strategy.STATIC_WEIGHT:
- strat = strategy.NewStaticWeightStrategy(opt.StaticWeightMap, int32(len(ts)))
- if err != nil {
- return nil, err
- }
- default:
- return nil, errors.New("no strategy has been chosen")
- }
- clusters, err := strat.Schedule()
- if err != nil {
- return nil, err
- }
-
- if clusters == nil || len(clusters) == 0 {
- return nil, errors.New("clusters is nil")
- }
-
- //save task
- var synergystatus int64
- if len(clusters) > 1 {
- synergystatus = 1
- }
-
- strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(opt.Strategy)
- if err != nil {
- return nil, err
- }
- adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(opt.AdapterId)
- if err != nil {
- return nil, err
- }
- id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(opt.TaskName, strategyCode, synergystatus, "11")
- if err != nil {
- return nil, err
- }
-
- l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中")
-
- for i := len(clusters) - 1; i >= 0; i-- {
- if clusters[i].Replicas == 0 {
- clusters = append(clusters[:i], clusters[i+1:]...)
- }
- }
-
- //save taskai
- for _, c := range clusters {
- clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId)
- opt.Replica = c.Replicas
- err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(id, opt, adapterName, c.ClusterId, clusterName, "", constants.Saved, "")
- if err != nil {
- return nil, err
- }
- }
-
- go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts)
-
- return resp, nil
- }
|