diff --git a/internal/participant/ai.go b/internal/participant/ai.go new file mode 100644 index 00000000..14a3f9cf --- /dev/null +++ b/internal/participant/ai.go @@ -0,0 +1,163 @@ +package participant + +import ( + "github.com/go-resty/resty/v2" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" + "net/http" +) + +const ( + // 算法路由 + AlgorithmById = "/ai/algorithm/get" //根据Id查询算法列表 + AlgorithmsList = "/ai/algorithm/list" //所有算法列表 + AlgorithmCreateById = "/ai/algorithm/create" //根据Id创建算法 + + // 数据集路由 + DatasetCreateById = "/ai/dataset/create" //根据Id创建数据集 + + // 模型相关路由 + ModelCreateById = "/ai/model/create" //根据Id创建模型 + + // 资源相关路由 + ResourceSpecList = "/ai/resource/specs" //所有资源列表,根据参数 train or infer 查询资源 + ResourceTrainingById = "/ai/resource/train/get" //根据Id查询资源列表 + ResourceTrainingList = "/ai/resource/train/list" //所有训练资源列表 + + // 任务相关路由 + TaskCreateTrain = "/ai/task/train" + TaskResultSync = "/ai/task/sync" + TaskLog = "/ai/task/log" + TaskTrainingDetail = "/ai/task/train/detail" + TaskInferenceDetail = "/ai/task/infer/detail" + + Localhost = "http://localhost:8080" +) + +type Ai struct { + store *database.AiStorage +} + +func NewAi() (*Ai, error) { + InitClient() + return &Ai{}, nil +} + +func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+AlgorithmById, http.MethodGet, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetBody(param).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+DatasetCreateById, http.MethodPost, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetBody(param).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+ModelCreateById, http.MethodPost, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetBody(param).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+TaskCreateTrain, http.MethodPost, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetBody(param).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+TaskResultSync, http.MethodPost, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + }).SetBody(param).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+TaskLog, http.MethodGet, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + "taskId": taskId, + }).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + "taskId": taskId, + }).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} + +func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) { + respErr := &RespErr{} + _, err = Request(Localhost+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) { + req.SetQueryParams(map[string]string{ + "pfId": platformId, + "taskId": taskId, + }).SetError(&respErr).SetResult(&resp) + }) + if err != nil { + return nil, err + } + return +} diff --git a/internal/participant/client.go b/internal/participant/client.go new file mode 100644 index 00000000..a2e15871 --- /dev/null +++ b/internal/participant/client.go @@ -0,0 +1,81 @@ +package participant + +import ( + "crypto/tls" + "errors" + "fmt" + "net/http" + "time" + + "github.com/go-resty/resty/v2" +) + +type ReqCallback func(req *resty.Request) + +var ( + NoRedirectClient *resty.Client + RestyClient *resty.Client + HttpClient *http.Client +) +var UserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36" +var DefaultTimeout = time.Second * 300 + +func InitClient() { + NoRedirectClient = resty.New().SetRedirectPolicy( + resty.RedirectPolicyFunc(func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }), + ).SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + NoRedirectClient.SetHeader("user-agent", UserAgent) + + RestyClient = NewRestyClient() + HttpClient = NewHttpClient() +} + +func NewRestyClient() *resty.Client { + client := resty.New(). + SetHeader("user-agent", UserAgent). + SetRetryCount(3). + SetRetryResetReaders(true). + SetTimeout(DefaultTimeout). + SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + return client +} + +func NewHttpClient() *http.Client { + return &http.Client{ + Timeout: time.Hour * 48, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } +} + +func Request(url string, method string, callback ReqCallback) ([]byte, error) { + respErr := &RespErr{} + req := RestyClient.R(). + SetHeaders(map[string]string{ + "Content-Type": "application/json", + }). + SetError(&respErr) + + if callback != nil { + callback(req) + } + + res, err := req.Execute(method, url) + + if err != nil { + return nil, err + } + if respErr.Message != "" { + return nil, errors.New(respErr.Message) + } + + if res.StatusCode() != http.StatusOK && res.StatusCode() != http.StatusCreated { + return nil, errors.New(fmt.Sprintf("msg: %s, status: %d", res.String(), res.StatusCode())) + } + + return res.Body(), nil +} diff --git a/internal/participant/model.go b/internal/participant/model.go new file mode 100644 index 00000000..43f87bba --- /dev/null +++ b/internal/participant/model.go @@ -0,0 +1,27 @@ +package participant + +type RespErr struct { + Code int32 `json:"code"` + Message string `json:"message"` +} + +type Resp struct { + Code int32 `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data"` +} + +type CreateParam struct { + Name string `json:"name" binding:"required"` + Desc string `json:"desc"` + Src interface{} `json:"src,omitempty"` + Param interface{} `json:"param,omitempty"` +} + +type TaskCreateParam struct { +} + +type TaskResultSyncParam struct { + Src interface{} `json:"src,omitempty"` + Param interface{} `json:"param,omitempty"` +} diff --git a/internal/svc/servicecontext.go b/internal/svc/servicecontext.go index ca22071c..4765e21b 100644 --- a/internal/svc/servicecontext.go +++ b/internal/svc/servicecontext.go @@ -23,6 +23,7 @@ import ( "github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/participant" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" @@ -56,6 +57,7 @@ type ServiceContext struct { AlertClient *alert.AlertmanagerAPI HttpClient *resty.Client Scheduler *scheduler.Scheduler + Ai *participant.Ai } func NewServiceContext(c config.Config) *ServiceContext { @@ -121,6 +123,11 @@ func NewServiceContext(c config.Config) *ServiceContext { panic(err) } scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService) + ai, err := participant.NewAi() + if err != nil { + logx.Error(err.Error()) + panic(err) + } return &ServiceContext{ DbEngin: dbEngin, Cron: cron.New(cron.WithSeconds()), @@ -136,5 +143,6 @@ func NewServiceContext(c config.Config) *ServiceContext { AlertClient: alertClient, HttpClient: httpClient, Scheduler: scheduler, + Ai: ai, } }