| @@ -44,4 +44,10 @@ type Config struct { | |||||
| Username string | Username string | ||||
| Password string | Password string | ||||
| } | } | ||||
| SnowflakeConf SnowflakeConf | |||||
| } | |||||
| // SnowflakeConf 雪花算法机器id配置 | |||||
| type SnowflakeConf struct { | |||||
| MachineId int64 `json:"machineId"` | |||||
| } | } | ||||
| @@ -65,15 +65,15 @@ func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { | |||||
| func (l *ScheduleCloudMq) Consume(_, val string) error { | func (l *ScheduleCloudMq) Consume(_, val string) error { | ||||
| // 接受消息, 根据标签筛选过滤 | // 接受消息, 根据标签筛选过滤 | ||||
| cloudSchdl := scheduler.NewCloudScheduler() | |||||
| schdl, err := scheduler.NewScheduler(cloudSchdl, val) | |||||
| cloudScheduler := scheduler.NewCloudScheduler() | |||||
| scheduler, err := scheduler.NewScheduler(cloudScheduler, val) | |||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| schdl.MatchLabels(l.svcCtx.DbEngin) | |||||
| scheduler.MatchLabels(l.svcCtx.DbEngin) | |||||
| // 存储数据 | // 存储数据 | ||||
| err = schdl.SaveToDb(l.svcCtx.DbEngin) | |||||
| err = scheduler.SaveToDb(l.svcCtx.DbEngin) | |||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| @@ -15,7 +15,7 @@ func NewAiScheduler(val string) *aiScheduler { | |||||
| return &aiScheduler{yamlString: val} | return &aiScheduler{yamlString: val} | ||||
| } | } | ||||
| func (cs aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| func (cs *aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| ai := model.Ai{ | ai := model.Ai{ | ||||
| ParticipantId: participantIds[0], | ParticipantId: participantIds[0], | ||||
| TaskId: task.TaskId, | TaskId: task.TaskId, | ||||
| @@ -26,6 +26,6 @@ func (cs aiScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []i | |||||
| return ai, nil | return ai, nil | ||||
| } | } | ||||
| func (cs aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| func (cs *aiScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| return nil, nil | return nil, nil | ||||
| } | } | ||||
| @@ -6,6 +6,7 @@ import ( | |||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo" | "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/algo" | ||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" | "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" | ||||
| "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" | "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" | ||||
| "gitlink.org.cn/jcce-pcm/utils/tool" | |||||
| "io" | "io" | ||||
| "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" | ||||
| "k8s.io/apimachinery/pkg/runtime" | "k8s.io/apimachinery/pkg/runtime" | ||||
| @@ -20,7 +21,7 @@ func NewCloudScheduler() *cloudScheduler { | |||||
| return &cloudScheduler{} | return &cloudScheduler{} | ||||
| } | } | ||||
| func (cs cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| strategy := algo.NewK8sStrategy(task, providers...) | strategy := algo.NewK8sStrategy(task, providers...) | ||||
| taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList) | taskResult, err := algo.ScheduleWithFullCollaboration(strategy, strategy.ProviderList) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -29,12 +30,13 @@ func (cs cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo | |||||
| return taskResult, nil | return taskResult, nil | ||||
| } | } | ||||
| func (cs cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| func (cs *cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| bytes, err := json.Marshal(task.Metadata) | bytes, err := json.Marshal(task.Metadata) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId) | cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId) | ||||
| cloud.Id = tool.GenSnowflakeID() | |||||
| cloud.YamlString = string(bytes) | cloud.YamlString = string(bytes) | ||||
| if len(participantIds) != 0 { | if len(participantIds) != 0 { | ||||
| cloud.ParticipantId = participantIds[0] | cloud.ParticipantId = participantIds[0] | ||||
| @@ -42,7 +44,7 @@ func (cs cloudScheduler) getNewStructForDb(task *types.TaskInfo, participantIds | |||||
| return cloud, nil | return cloud, nil | ||||
| } | } | ||||
| func (cs cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { | |||||
| func (cs *cloudScheduler) UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { | |||||
| var cloud model.Cloud | var cloud model.Cloud | ||||
| d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) | d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) | ||||
| var err error | var err error | ||||
| @@ -15,7 +15,7 @@ func NewHpcScheduler(val string) *hpcScheduler { | |||||
| return &hpcScheduler{yamlString: val} | return &hpcScheduler{yamlString: val} | ||||
| } | } | ||||
| func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| func (h *hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []int64) (interface{}, error) { | |||||
| hpc := model.Hpc{ | hpc := model.Hpc{ | ||||
| TaskId: task.TaskId, | TaskId: task.TaskId, | ||||
| Status: "Saved", | Status: "Saved", | ||||
| @@ -26,6 +26,6 @@ func (h hpcScheduler) getNewStructForDb(task *types.TaskInfo, participantIds []i | |||||
| return hpc, nil | return hpc, nil | ||||
| } | } | ||||
| func (cs hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| func (cs *hpcScheduler) pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Task, error) { | |||||
| return nil, nil | return nil, nil | ||||
| } | } | ||||
| @@ -23,7 +23,7 @@ func NewScheduler(scheduleService scheduleService, val string) (*scheduler, erro | |||||
| return &scheduler{task: task, scheduleService: scheduleService}, nil | return &scheduler{task: task, scheduleService: scheduleService}, nil | ||||
| } | } | ||||
| func (s scheduler) MatchLabels(dbEngin *gorm.DB) { | |||||
| func (s *scheduler) MatchLabels(dbEngin *gorm.DB) { | |||||
| //if len(task.MatchLabels) != 0 { | //if len(task.MatchLabels) != 0 { | ||||
| // participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) | // participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) | ||||
| // if err != nil { | // if err != nil { | ||||
| @@ -47,11 +47,11 @@ func (s scheduler) MatchLabels(dbEngin *gorm.DB) { | |||||
| s.participantIds = micsSlice(ids, 1) | s.participantIds = micsSlice(ids, 1) | ||||
| } | } | ||||
| func (s scheduler) AssignAndSchedule() { | |||||
| func (s *scheduler) AssignAndSchedule() { | |||||
| } | } | ||||
| func (s scheduler) SaveToDb(dbEngin *gorm.DB) error { | |||||
| func (s *scheduler) SaveToDb(dbEngin *gorm.DB) error { | |||||
| if len(s.participantIds) == 0 { | if len(s.participantIds) == 0 { | ||||
| return errors.New("participantIds 为空") | return errors.New("participantIds 为空") | ||||
| } | } | ||||
| @@ -17,6 +17,7 @@ import ( | |||||
| "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelartsclient" | "gitlink.org.cn/jcce-pcm/pcm-participant-modelarts/modelartsclient" | ||||
| "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" | "gitlink.org.cn/jcce-pcm/pcm-participant-octopus/octopusclient" | ||||
| "gitlink.org.cn/jcce-pcm/pcm-participant-slurm/hpcthclient" | "gitlink.org.cn/jcce-pcm/pcm-participant-slurm/hpcthclient" | ||||
| "gitlink.org.cn/jcce-pcm/utils/tool" | |||||
| "gorm.io/driver/mysql" | "gorm.io/driver/mysql" | ||||
| "gorm.io/gorm" | "gorm.io/gorm" | ||||
| "gorm.io/gorm/schema" | "gorm.io/gorm/schema" | ||||
| @@ -49,6 +50,12 @@ func NewServiceContext(c config.Config) *ServiceContext { | |||||
| DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS | DisableSSL: aws.Bool(false), //是否禁用https,这里表示不禁用,即使用HTTPS | ||||
| S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html | S3ForcePathStyle: aws.Bool(true), //使用路径样式而非虚拟主机样式,区别请参考:https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html | ||||
| }) | }) | ||||
| //添加snowflake支持 | |||||
| err := tool.InitSnowflake(c.SnowflakeConf.MachineId) | |||||
| if err != nil { | |||||
| logx.Errorf("InitSnowflake err: ", err) | |||||
| panic("InitSnowflake err") | |||||
| } | |||||
| downloader := s3manager.NewDownloader(session) | downloader := s3manager.NewDownloader(session) | ||||
| uploader := s3manager.NewUploader(session) | uploader := s3manager.NewUploader(session) | ||||
| //启动Gorm支持 | //启动Gorm支持 | ||||