Browse Source

update service

pull/524/head
tzwang 4 months ago
parent
commit
17936bfc4d
4 changed files with 96 additions and 16 deletions
  1. +4
    -1
      etc/pcm.yaml
  2. +6
    -0
      internal/config/config.go
  3. +85
    -14
      internal/participant/ai.go
  4. +1
    -1
      internal/svc/servicecontext.go

+ 4
- 1
etc/pcm.yaml View File

@@ -90,4 +90,7 @@ BlockChain:
Type: "2" Type: "2"


JcsMiddleware: JcsMiddleware:
JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport
JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport

Participant:
AdapterId: "1777144940456666666"

+ 6
- 0
internal/config/config.go View File

@@ -53,6 +53,8 @@ type Config struct {
Monitoring Monitoring Monitoring Monitoring


JcsMiddleware JcsMiddleware JcsMiddleware JcsMiddleware

Participant Participant
} }
type Monitoring struct { type Monitoring struct {
PromUrl string PromUrl string
@@ -67,3 +69,7 @@ type SnowflakeConf struct {
type JcsMiddleware struct { type JcsMiddleware struct {
JobStatusReportUrl string JobStatusReportUrl string
} }

type Participant struct {
AdapterId string
}

+ 85
- 14
internal/participant/ai.go View File

@@ -1,9 +1,12 @@
package participant package participant


import ( import (
"errors"
"fmt"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"net/http" "net/http"
"sync"
) )


const ( const (
@@ -29,22 +32,58 @@ const (
TaskLog = "/ai/task/log" TaskLog = "/ai/task/log"
TaskTrainingDetail = "/ai/task/train/detail" TaskTrainingDetail = "/ai/task/train/detail"
TaskInferenceDetail = "/ai/task/infer/detail" TaskInferenceDetail = "/ai/task/infer/detail"

Localhost = "http://localhost:8080"
) )


type Ai struct { type Ai struct {
store *database.AiStorage
store *database.AiStorage
idAddr sync.Map
} }


func NewAi() (*Ai, error) {
func New(store *database.AiStorage, adapterId string) (*Ai, error) {
if store == nil {
return nil, errors.New("store cannot be nil")
}

a := &Ai{
store: store,
}

css, err := store.GetClustersByAdapterId(adapterId)
if err != nil {
return nil, fmt.Errorf("failed to get clusters: %w", err)
}

for _, info := range css.List {
a.idAddr.Store(info.Id, info.Server)
}

InitClient() InitClient()
return &Ai{}, nil
return a, nil
}

func (a *Ai) UpdateAddr(id string, addr string) {
a.idAddr.Store(id, addr)
}

func (a *Ai) GetServerAddrById(id string) (string, bool) {
val, ok := a.idAddr.Load(id)
if !ok {
return "", false
}
addr, ok := val.(string)
if !ok {
return "", false
}
return addr, true
} }


func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) { func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmById, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+AlgorithmById, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetError(&respErr).SetResult(&resp) }).SetError(&respErr).SetResult(&resp)
@@ -56,8 +95,12 @@ func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) {
} }


func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp) }).SetBody(param).SetError(&respErr).SetResult(&resp)
@@ -69,8 +112,12 @@ func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *R
} }


func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+DatasetCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+DatasetCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp) }).SetBody(param).SetError(&respErr).SetResult(&resp)
@@ -82,8 +129,12 @@ func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Res
} }


func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+ModelCreateById, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+ModelCreateById, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp) }).SetBody(param).SetError(&respErr).SetResult(&resp)
@@ -95,8 +146,12 @@ func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp,
} }


func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) { func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+TaskCreateTrain, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+TaskCreateTrain, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp) }).SetBody(param).SetError(&respErr).SetResult(&resp)
@@ -108,8 +163,12 @@ func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *R
} }


func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) { func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+TaskResultSync, http.MethodPost, func(req *resty.Request) {
_, err = Request(addr+TaskResultSync, http.MethodPost, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
}).SetBody(param).SetError(&respErr).SetResult(&resp) }).SetBody(param).SetError(&respErr).SetResult(&resp)
@@ -121,8 +180,12 @@ func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp
} }


func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) { func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+TaskLog, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskLog, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
"taskId": taskId, "taskId": taskId,
@@ -135,8 +198,12 @@ func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) {
} }


func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) { func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
"taskId": taskId, "taskId": taskId,
@@ -149,8 +216,12 @@ func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, e
} }


func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) { func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) {
addr, ok := a.GetServerAddrById(platformId)
if !ok {
return nil, fmt.Errorf("clusterId not found: %s", platformId)
}
respErr := &RespErr{} respErr := &RespErr{}
_, err = Request(Localhost+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) {
_, err = Request(addr+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) {
req.SetQueryParams(map[string]string{ req.SetQueryParams(map[string]string{
"pfId": platformId, "pfId": platformId,
"taskId": taskId, "taskId": taskId,


+ 1
- 1
internal/svc/servicecontext.go View File

@@ -123,7 +123,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
panic(err) panic(err)
} }
scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService) scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService)
ai, err := participant.NewAi()
ai, err := participant.New(storage, c.Participant.AdapterId)
if err != nil { if err != nil {
logx.Error(err.Error()) logx.Error(err.Error())
panic(err) panic(err)


Loading…
Cancel
Save