| @@ -1,15 +1,16 @@ | |||
| NacosConfig: | |||
| DataId: pcm-core-rpc.yaml | |||
| Group: DEFAULT_GROUP | |||
| ServerConfigs: | |||
| # - IpAddr: 127.0.0.1 | |||
| # Port: 8848 | |||
| - IpAddr: nacos.jcce.dev | |||
| Port: 8848 | |||
| ClientConfig: | |||
| NamespaceId: test | |||
| TimeoutMs: 5000 | |||
| NotLoadCacheAtStart: true | |||
| LogDir: | |||
| CacheDir: | |||
| LogLevel: info | |||
| Name: pcm.core.rpc | |||
| ListenOn: 0.0.0.0:2004 | |||
| Timeout: 15000 # 15s,设置rpc服务的响应的超时时间,若超过15s还未返回则结束请求 | |||
| DB: | |||
| DataSource: root:uJpLd6u-J?HC1@(10.206.0.12:3306)/pcm?parseTime=true | |||
| SnowflakeConf: | |||
| MachineId: 1 | |||
| RedisConf: | |||
| Host: 10.206.0.7:6379 | |||
| Pass: redisPW123 | |||
| Type: node | |||
| Tls: false | |||
| @@ -1,79 +0,0 @@ | |||
| /* | |||
| Copyright (c) [2023] [pcm] | |||
| [pcm-coordinator] is licensed under Mulan PSL v2. | |||
| You can use this software according to the terms and conditions of the Mulan PSL v2. | |||
| You may obtain a copy of Mulan PSL v2 at: | |||
| http://license.coscl.org.cn/MulanPSL2 | |||
| THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, | |||
| EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, | |||
| MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. | |||
| See the Mulan PSL v2 for more details. | |||
| */ | |||
| package cron | |||
| import ( | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" | |||
| "gorm.io/gorm" | |||
| "strings" | |||
| ) | |||
| func InitCron(svc *svc.ServiceContext) { | |||
| svc.Cron.Start() | |||
| svc.Cron.AddFunc("*/5 * * * * ?", func() { | |||
| var tasks []models.Task | |||
| svc.DbEngin.Where("status not in ?", []string{constants.Deleted, constants.Succeeded, constants.Completed, constants.Failed}).Find(&tasks) | |||
| for _, task := range tasks { | |||
| var allStatus string | |||
| tx := svc.DbEngin.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", task.Id).Scan(&allStatus) | |||
| if tx.Error != nil { | |||
| logx.Error(tx.Error) | |||
| } | |||
| // 子状态统一则修改主任务状态 | |||
| statusArray := strings.Split(allStatus, ",") | |||
| if len(removeRepeatedElement(statusArray)) == 1 { | |||
| updateTask(svc.DbEngin, &task, statusArray[0]) | |||
| continue | |||
| } | |||
| // 子任务包含失败状态 主任务则失败 | |||
| if strings.Contains(allStatus, constants.Failed) { | |||
| updateTask(svc.DbEngin, &task, constants.Failed) | |||
| continue | |||
| } | |||
| if strings.Contains(allStatus, constants.Running) { | |||
| updateTask(svc.DbEngin, &task, constants.Running) | |||
| } | |||
| } | |||
| }) | |||
| } | |||
| func updateTask(dbEngin *gorm.DB, task *models.Task, status string) { | |||
| if task.Status != status { | |||
| task.Status = status | |||
| dbEngin.Updates(&task) | |||
| } | |||
| } | |||
| func removeRepeatedElement(arr []string) (newArr []string) { | |||
| newArr = make([]string, 0) | |||
| for i := 0; i < len(arr); i++ { | |||
| repeat := false | |||
| for j := i + 1; j < len(arr); j++ { | |||
| if arr[i] == arr[j] { | |||
| repeat = true | |||
| break | |||
| } | |||
| } | |||
| if !repeat { | |||
| newArr = append(newArr, arr[i]) | |||
| } | |||
| } | |||
| return | |||
| } | |||
| @@ -17,8 +17,11 @@ package pcmcorelogic | |||
| import ( | |||
| "context" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore" | |||
| "gorm.io/gorm" | |||
| "strings" | |||
| "time" | |||
| "github.com/zeromicro/go-zero/core/logx" | |||
| @@ -54,20 +57,73 @@ func (l *SyncInfoLogic) SyncInfo(in *pcmCore.SyncInfoReq) (*pcmCore.SyncInfoResp | |||
| switch kind { | |||
| case constants.CLOUD: | |||
| for _, cloudInfo := range in.CloudInfoList { | |||
| l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,running_time = ?,result = ? where participant_id = ? and task_id = ? and namespace = ? and name = ?", | |||
| cloudInfo.Status, cloudInfo.StartTime, cloudInfo.RunningTime, cloudInfo.Result, in.ParticipantId, cloudInfo.TaskId, cloudInfo.Namespace, cloudInfo.Name) | |||
| l.svcCtx.DbEngin.Exec("update cloud set status = ?,start_time = ?,result = ? where participant_id = ? and id = ?", | |||
| cloudInfo.Status, cloudInfo.StartTime, cloudInfo.Result, in.ParticipantId, cloudInfo.Id) | |||
| syncTask(l.svcCtx.DbEngin, cloudInfo.TaskId) | |||
| } | |||
| case constants.HPC: | |||
| for _, hpcInfo := range in.HpcInfoList { | |||
| l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,running_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", | |||
| l.svcCtx.DbEngin.Exec("update hpc set status = ?,start_time = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", | |||
| hpcInfo.Status, hpcInfo.StartTime, hpcInfo.RunningTime, hpcInfo.JobId, in.ParticipantId, hpcInfo.TaskId, hpcInfo.Name) | |||
| syncTask(l.svcCtx.DbEngin, hpcInfo.TaskId) | |||
| } | |||
| case constants.AI: | |||
| for _, aiInfo := range in.AiInfoList { | |||
| l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,running_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", | |||
| aiInfo.Status, aiInfo.StartTime, aiInfo.RunningTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name) | |||
| l.svcCtx.DbEngin.Exec("update ai set status = ?,start_time = ?,project_id = ?,job_id = ? where participant_id = ? and task_id = ? and name = ?", | |||
| aiInfo.Status, aiInfo.StartTime, aiInfo.ProjectId, aiInfo.JobId, in.ParticipantId, aiInfo.TaskId, aiInfo.Name) | |||
| syncTask(l.svcCtx.DbEngin, aiInfo.TaskId) | |||
| } | |||
| } | |||
| return &pcmCore.SyncInfoResp{}, nil | |||
| } | |||
| func syncTask(gorm *gorm.DB, taskId int64) { | |||
| var allStatus string | |||
| tx := gorm.Raw("SELECT CONCAT_WS(',',GROUP_CONCAT(DISTINCT h.status) ,GROUP_CONCAT(DISTINCT a.status) ,GROUP_CONCAT(DISTINCT c.status))as status from task t left join hpc h on t.id = h.task_id left join cloud c on t.id = c.task_id left join ai a on t.id = a.task_id where t.id = ?", taskId).Scan(&allStatus) | |||
| if tx.Error != nil { | |||
| logx.Error(tx.Error) | |||
| } | |||
| // 子状态统一则修改主任务状态 | |||
| statusArray := strings.Split(allStatus, ",") | |||
| if len(removeRepeatedElement(statusArray)) == 1 { | |||
| updateTask(gorm, taskId, statusArray[0]) | |||
| } | |||
| // 子任务包含失败状态 主任务则失败 | |||
| if strings.Contains(allStatus, constants.Failed) { | |||
| updateTask(gorm, taskId, constants.Failed) | |||
| } | |||
| if strings.Contains(allStatus, constants.Running) { | |||
| updateTask(gorm, taskId, constants.Running) | |||
| } | |||
| } | |||
| func updateTask(gorm *gorm.DB, taskId int64, status string) { | |||
| var task models.Task | |||
| gorm.Where("id = ? ", taskId).Find(&task) | |||
| if task.Status != status { | |||
| task.Status = status | |||
| gorm.Updates(&task) | |||
| } | |||
| } | |||
| func removeRepeatedElement(arr []string) (newArr []string) { | |||
| newArr = make([]string, 0) | |||
| for i := 0; i < len(arr); i++ { | |||
| repeat := false | |||
| for j := i + 1; j < len(arr); j++ { | |||
| if arr[i] == arr[j] { | |||
| repeat = true | |||
| break | |||
| } | |||
| } | |||
| if !repeat { | |||
| newArr = append(newArr, arr[i]) | |||
| } | |||
| } | |||
| return | |||
| } | |||
| @@ -42,18 +42,19 @@ message CloudInfo { | |||
| int64 runningTime = 9; | |||
| string result = 10; | |||
| string yamlString = 11; | |||
| int64 id = 12; | |||
| } | |||
| message VmInfo { | |||
| int64 participantId = 1; | |||
| int64 taskId = 2; | |||
| string name = 3; | |||
| string flavor_ref =4; | |||
| string image_ref =5; | |||
| string network_uuid=6; | |||
| string block_uuid=7; | |||
| string source_type=8; | |||
| bool delete_on_termination=9; | |||
| string flavor_ref = 4; | |||
| string image_ref = 5; | |||
| string network_uuid = 6; | |||
| string block_uuid = 7; | |||
| string source_type = 8; | |||
| bool delete_on_termination = 9; | |||
| string state = 10; | |||
| } | |||
| @@ -323,6 +323,7 @@ type CloudInfo struct { | |||
| RunningTime int64 `protobuf:"varint,9,opt,name=runningTime,proto3" json:"runningTime,omitempty"` | |||
| Result string `protobuf:"bytes,10,opt,name=result,proto3" json:"result,omitempty"` | |||
| YamlString string `protobuf:"bytes,11,opt,name=yamlString,proto3" json:"yamlString,omitempty"` | |||
| Id int64 `protobuf:"varint,12,opt,name=id,proto3" json:"id,omitempty"` | |||
| } | |||
| func (x *CloudInfo) Reset() { | |||
| @@ -434,6 +435,13 @@ func (x *CloudInfo) GetYamlString() string { | |||
| return "" | |||
| } | |||
| func (x *CloudInfo) GetId() int64 { | |||
| if x != nil { | |||
| return x.Id | |||
| } | |||
| return 0 | |||
| } | |||
| type VmInfo struct { | |||
| state protoimpl.MessageState | |||
| sizeCache protoimpl.SizeCache | |||
| @@ -2542,7 +2550,7 @@ var file_pcmCore_proto_rawDesc = []byte{ | |||
| 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x24, 0x0a, 0x0d, 0x69, 0x74, | |||
| 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, | |||
| 0x09, 0x52, 0x0d, 0x69, 0x74, 0x65, 0x6d, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, | |||
| 0x22, 0xbb, 0x02, 0x0a, 0x09, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x20, | |||
| 0x22, 0xcb, 0x02, 0x0a, 0x09, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x20, | |||
| 0x0a, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x18, 0x01, 0x20, | |||
| 0x01, 0x28, 0x03, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, | |||
| 0x12, 0x16, 0x0a, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, | |||
| @@ -2561,7 +2569,8 @@ var file_pcmCore_proto_rawDesc = []byte{ | |||
| 0x6e, 0x67, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, | |||
| 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, | |||
| 0x0a, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x18, 0x0b, 0x20, 0x01, | |||
| 0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x22, 0xc3, | |||
| 0x28, 0x09, 0x52, 0x0a, 0x79, 0x61, 0x6d, 0x6c, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x12, 0x0e, | |||
| 0x0a, 0x02, 0x69, 0x64, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, 0x69, 0x64, 0x22, 0xc3, | |||
| 0x02, 0x0a, 0x06, 0x56, 0x6d, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x24, 0x0a, 0x0d, 0x70, 0x61, 0x72, | |||
| 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, | |||
| 0x52, 0x0d, 0x70, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69, 0x70, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, | |||
| @@ -21,13 +21,11 @@ import ( | |||
| "github.com/zeromicro/go-zero/core/service" | |||
| "github.com/zeromicro/go-zero/zrpc" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/config" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/cron" | |||
| participantserviceServer "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/server/participantservice" | |||
| pcmcoreServer "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/server/pcmcore" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/internal/svc" | |||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore" | |||
| "gitlink.org.cn/jcce-pcm/utils/interceptor/rpcserver" | |||
| commonConfig "gitlink.org.cn/jcce-pcm/utils/nacos" | |||
| "google.golang.org/grpc" | |||
| "google.golang.org/grpc/reflection" | |||
| ) | |||
| @@ -38,28 +36,8 @@ func main() { | |||
| flag.Parse() | |||
| var bootstrapConfig commonConfig.BootstrapConfig | |||
| conf.MustLoad(*configFile, &bootstrapConfig) | |||
| //解析业务配置 | |||
| var c config.Config | |||
| nacosConfig := bootstrapConfig.NacosConfig | |||
| serviceConfigContent := nacosConfig.InitConfig(func(data string) { | |||
| err := conf.LoadFromYamlBytes([]byte(data), &c) | |||
| if err != nil { | |||
| panic(err) | |||
| } | |||
| }) | |||
| err := conf.LoadFromYamlBytes([]byte(serviceConfigContent), &c) | |||
| if err != nil { | |||
| panic(err) | |||
| } | |||
| // start log component | |||
| logx.MustSetup(c.LogConf) | |||
| // 注册到nacos | |||
| nacosConfig.Discovery(&c.RpcServerConf) | |||
| conf.MustLoad(*configFile, &c) | |||
| ctx := svc.NewServiceContext(c) | |||
| s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) { | |||
| @@ -74,8 +52,6 @@ func main() { | |||
| s.AddUnaryInterceptors(rpcserver.LoggerInterceptor) | |||
| defer s.Stop() | |||
| // 初始化定时任务 | |||
| cron.InitCron(ctx) | |||
| logx.Infof("Starting rpc server at %s...\n", c.ListenOn) | |||
| s.Start() | |||
| } | |||