From 17936bfc4d56714f8279ba64ef665cc55fa0a2ca Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 17 Jul 2025 11:23:10 +0800 Subject: [PATCH] update service --- etc/pcm.yaml | 5 +- internal/config/config.go | 6 +++ internal/participant/ai.go | 99 +++++++++++++++++++++++++++++----- internal/svc/servicecontext.go | 2 +- 4 files changed, 96 insertions(+), 16 deletions(-) diff --git a/etc/pcm.yaml b/etc/pcm.yaml index 41f11b2e..708f6e0f 100644 --- a/etc/pcm.yaml +++ b/etc/pcm.yaml @@ -90,4 +90,7 @@ BlockChain: Type: "2" JcsMiddleware: - JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport \ No newline at end of file + JobStatusReportUrl: http://101.201.215.196:7891/jobSet/jobStatusReport + +Participant: + AdapterId: "1777144940456666666" \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index 64d525ea..8c15f55e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,6 +53,8 @@ type Config struct { Monitoring Monitoring JcsMiddleware JcsMiddleware + + Participant Participant } type Monitoring struct { PromUrl string @@ -67,3 +69,7 @@ type SnowflakeConf struct { type JcsMiddleware struct { JobStatusReportUrl string } + +type Participant struct { + AdapterId string +} diff --git a/internal/participant/ai.go b/internal/participant/ai.go index 14a3f9cf..d80aa883 100644 --- a/internal/participant/ai.go +++ b/internal/participant/ai.go @@ -1,9 +1,12 @@ package participant import ( + "errors" + "fmt" "github.com/go-resty/resty/v2" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "net/http" + "sync" ) const ( @@ -29,22 +32,58 @@ const ( TaskLog = "/ai/task/log" TaskTrainingDetail = "/ai/task/train/detail" TaskInferenceDetail = "/ai/task/infer/detail" - - Localhost = "http://localhost:8080" ) type Ai struct { - store *database.AiStorage + store *database.AiStorage + idAddr sync.Map } -func NewAi() (*Ai, error) { +func New(store *database.AiStorage, adapterId string) (*Ai, error) { + if store == nil { + return nil, errors.New("store cannot be nil") + } + + a := &Ai{ + store: store, + } + + css, err := store.GetClustersByAdapterId(adapterId) + if err != nil { + return nil, fmt.Errorf("failed to get clusters: %w", err) + } + + for _, info := range css.List { + a.idAddr.Store(info.Id, info.Server) + } + InitClient() - return &Ai{}, nil + return a, nil +} + +func (a *Ai) UpdateAddr(id string, addr string) { + a.idAddr.Store(id, addr) +} + +func (a *Ai) GetServerAddrById(id string) (string, bool) { + val, ok := a.idAddr.Load(id) + if !ok { + return "", false + } + addr, ok := val.(string) + if !ok { + return "", false + } + return addr, true } func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+AlgorithmById, http.MethodGet, func(req *resty.Request) { + _, err = Request(addr+AlgorithmById, http.MethodGet, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetError(&respErr).SetResult(&resp) @@ -56,8 +95,12 @@ func (a *Ai) AlgorithmById(platformId string) (resp *Resp, err error) { } func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) { + _, err = Request(addr+AlgorithmCreateById, http.MethodPost, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetBody(param).SetError(&respErr).SetResult(&resp) @@ -69,8 +112,12 @@ func (a *Ai) AlgorithmCreateById(platformId string, param *CreateParam) (resp *R } func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+DatasetCreateById, http.MethodPost, func(req *resty.Request) { + _, err = Request(addr+DatasetCreateById, http.MethodPost, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetBody(param).SetError(&respErr).SetResult(&resp) @@ -82,8 +129,12 @@ func (a *Ai) DatasetCreateById(platformId string, param *CreateParam) (resp *Res } func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+ModelCreateById, http.MethodPost, func(req *resty.Request) { + _, err = Request(addr+ModelCreateById, http.MethodPost, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetBody(param).SetError(&respErr).SetResult(&resp) @@ -95,8 +146,12 @@ func (a *Ai) ModelCreateById(platformId string, param *CreateParam) (resp *Resp, } func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+TaskCreateTrain, http.MethodPost, func(req *resty.Request) { + _, err = Request(addr+TaskCreateTrain, http.MethodPost, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetBody(param).SetError(&respErr).SetResult(&resp) @@ -108,8 +163,12 @@ func (a *Ai) TaskCreateTrain(platformId string, param *TaskCreateParam) (resp *R } func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+TaskResultSync, http.MethodPost, func(req *resty.Request) { + _, err = Request(addr+TaskResultSync, http.MethodPost, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, }).SetBody(param).SetError(&respErr).SetResult(&resp) @@ -121,8 +180,12 @@ func (a *Ai) TaskResultSync(platformId string, param *TaskResultSyncParam) (resp } func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+TaskLog, http.MethodGet, func(req *resty.Request) { + _, err = Request(addr+TaskLog, http.MethodGet, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, "taskId": taskId, @@ -135,8 +198,12 @@ func (a *Ai) TaskLog(platformId string, taskId string) (resp *Resp, err error) { } func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) { + _, err = Request(addr+TaskTrainingDetail, http.MethodGet, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, "taskId": taskId, @@ -149,8 +216,12 @@ func (a *Ai) TaskTrainingDetail(platformId string, taskId string) (resp *Resp, e } func (a *Ai) TaskInferenceDetail(platformId string, taskId string) (resp *Resp, err error) { + addr, ok := a.GetServerAddrById(platformId) + if !ok { + return nil, fmt.Errorf("clusterId not found: %s", platformId) + } respErr := &RespErr{} - _, err = Request(Localhost+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) { + _, err = Request(addr+TaskInferenceDetail, http.MethodGet, func(req *resty.Request) { req.SetQueryParams(map[string]string{ "pfId": platformId, "taskId": taskId, diff --git a/internal/svc/servicecontext.go b/internal/svc/servicecontext.go index 4765e21b..771051da 100644 --- a/internal/svc/servicecontext.go +++ b/internal/svc/servicecontext.go @@ -123,7 +123,7 @@ func NewServiceContext(c config.Config) *ServiceContext { panic(err) } scheduler := scheduler.NewSchdlr(aiService, storage, hpcStorage, hpcService) - ai, err := participant.NewAi() + ai, err := participant.New(storage, c.Participant.AdapterId) if err != nil { logx.Error(err.Error()) panic(err)