You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 37 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "k8s.io/klog/v2"
  27. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  28. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  29. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  30. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  32. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset"
  34. "github.com/kubeedge/sedna/pkg/localcontroller/managers/model"
  35. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  36. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  37. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  38. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. )
  41. // IncrementalLearningJob defines config for incremental-learning-job
  42. type Job struct {
  43. sednav1.IncrementalLearningJob
  44. JobConfig *JobConfig
  45. }
  46. // JobConfig defines config for incremental-learning-job
  47. type JobConfig struct {
  48. UniqueIdentifier string
  49. Rounds int
  50. TrainTrigger trigger.Base
  51. DeployTrigger trigger.Base
  52. TriggerTime time.Time
  53. TrainTriggerStatus string
  54. EvalTriggerStatus string
  55. DeployTriggerStatus string
  56. TrainDataURL string
  57. EvalDataURL string
  58. OutputDir string
  59. OutputConfig *OutputConfig
  60. DataSamples *DataSamples
  61. TrainModel *Model
  62. DeployModel *Model
  63. EvalModels []Model
  64. EvalResult []Model
  65. Lock sync.Mutex
  66. Dataset *dataset.Dataset
  67. Storage storage.Storage
  68. Done chan struct{}
  69. }
  70. type Model = clienttypes.Model
  71. // OutputConfig defines config for job output
  72. type OutputConfig struct {
  73. SamplesOutput map[string]string `json:"trainData"`
  74. TrainOutput string `json:"trainOutput"`
  75. EvalOutput string `json:"evalOutput"`
  76. }
  77. // DataSamples defines samples information
  78. type DataSamples struct {
  79. PreviousNumbers int
  80. TrainSamples []string
  81. EvalVersionSamples [][]string
  82. EvalSamples []string
  83. }
  84. // IncrementalLearningJob defines incremental-learning-job manager
  85. type Manager struct {
  86. Client clienttypes.ClientI
  87. WorkerMessageChannel chan workertypes.MessageContent
  88. DatasetManager *dataset.Manager
  89. ModelManager *model.Manager
  90. IncrementalJobMap map[string]*Job
  91. VolumeMountPrefix string
  92. }
  93. const (
  94. // JobIterationIntervalSeconds is interval time of each iteration of job
  95. JobIterationIntervalSeconds = 10
  96. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  97. DatasetHandlerIntervalSeconds = 10
  98. // EvalSamplesCapacity is capacity of eval samples
  99. EvalSamplesCapacity = 5
  100. //KindName is kind of incremental-learning-job resource
  101. KindName = "incrementallearningjob"
  102. // TriggerReadyStatus is the ready status about trigger
  103. TriggerReadyStatus = "ready"
  104. // TriggerCompletedStatus is the completed status about trigger
  105. TriggerCompletedStatus = "completed"
  106. AnnotationsRoundsKey = "sedna.io/rounds"
  107. AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
  108. AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"
  109. )
  110. // New creates a incremental-learning-job manager
  111. func New(client clienttypes.ClientI, datasetManager *dataset.Manager,
  112. modelManager *model.Manager, options *options.LocalControllerOptions) *Manager {
  113. im := Manager{
  114. Client: client,
  115. WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize),
  116. DatasetManager: datasetManager,
  117. ModelManager: modelManager,
  118. IncrementalJobMap: make(map[string]*Job),
  119. VolumeMountPrefix: options.VolumeMountPrefix,
  120. }
  121. return &im
  122. }
  123. // Start starts incremental-learning-job manager
  124. func (im *Manager) Start() error {
  125. go im.monitorWorker()
  126. return nil
  127. }
  128. // trainTask starts training task
  129. func (im *Manager) trainTask(job *Job) error {
  130. jobConfig := job.JobConfig
  131. latestCond := im.getLatestCondition(job)
  132. jobStage := latestCond.Stage
  133. currentType := latestCond.Type
  134. if currentType == sednav1.ILJobStageCondWaiting {
  135. var err error
  136. err = im.loadDataset(job)
  137. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  138. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  139. jobConfig.UniqueIdentifier, err)
  140. }
  141. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  142. return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier)
  143. }
  144. err = im.loadTrainModel(job)
  145. if err != nil {
  146. return fmt.Errorf("failed to sync train model, and waiting it: %w", err)
  147. }
  148. initTriggerStatus(jobConfig)
  149. if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  150. payload, ok, err := im.triggerTrainTask(job)
  151. if !ok {
  152. return nil
  153. }
  154. if err != nil {
  155. klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v",
  156. jobConfig.UniqueIdentifier, jobStage, err)
  157. job.JobConfig.Rounds--
  158. return err
  159. }
  160. err = im.Client.WriteMessage(payload, job.getHeader())
  161. if err != nil {
  162. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  163. job.JobConfig.Rounds--
  164. return err
  165. }
  166. forwardSamples(jobConfig, jobStage)
  167. err = im.saveJobToDB(job)
  168. if err != nil {
  169. klog.Errorf("job(%s) failed to save job to db: %v",
  170. jobConfig.UniqueIdentifier, err)
  171. // continue anyway
  172. }
  173. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  174. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  175. jobConfig.UniqueIdentifier, jobStage)
  176. }
  177. }
  178. return nil
  179. }
  180. // evalTask starts eval task
  181. func (im *Manager) evalTask(job *Job) error {
  182. jobConfig := job.JobConfig
  183. latestCond := im.getLatestCondition(job)
  184. jobStage := latestCond.Stage
  185. currentType := latestCond.Type
  186. if currentType == sednav1.ILJobStageCondWaiting {
  187. var err error
  188. err = im.loadDataset(job)
  189. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  190. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  191. jobConfig.UniqueIdentifier, err)
  192. }
  193. err = im.loadDeployModel(job)
  194. if err != nil {
  195. return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
  196. }
  197. if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  198. payload, err := im.triggerEvalTask(job)
  199. if err != nil {
  200. klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v",
  201. jobConfig.UniqueIdentifier, jobStage, err)
  202. return err
  203. }
  204. err = im.Client.WriteMessage(payload, job.getHeader())
  205. if err != nil {
  206. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  207. return err
  208. }
  209. forwardSamples(jobConfig, jobStage)
  210. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  211. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  212. jobConfig.UniqueIdentifier, jobStage)
  213. }
  214. }
  215. return nil
  216. }
  217. // hotModelUpdateDeployTask starts deploy task when job supports hot model update
  218. func (im *Manager) hotModelUpdateDeployTask(job *Job) error {
  219. var localModelConfigFile string
  220. if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  221. localModelConfigFile = v
  222. } else {
  223. return nil
  224. }
  225. latestCond := im.getLatestCondition(job)
  226. currentType := latestCond.Type
  227. if currentType == sednav1.ILJobStageCondRunning && job.JobConfig.DeployTriggerStatus == TriggerReadyStatus {
  228. models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy)
  229. if models == nil {
  230. return nil
  231. }
  232. trainedModel := models[0]
  233. deployModel := models[1]
  234. trainedModelURL := trainedModel.URL
  235. modelName := filepath.Base(trainedModelURL)
  236. localHostDir := filepath.Dir(localModelConfigFile)
  237. localHostModelFile := filepath.Join(localHostDir, modelName)
  238. modelFile := util.AddPrefixPath(im.VolumeMountPrefix, localHostModelFile)
  239. if err := im.updateDeployModelFile(job, trainedModelURL, modelFile); err != nil {
  240. return err
  241. }
  242. deployModelURL := deployModel.URL
  243. if err := im.updateDeployModelFile(job, trainedModelURL, deployModelURL); err != nil {
  244. return err
  245. }
  246. config := map[string]map[string]string{
  247. "model_config": {
  248. "model_path": strings.Replace(localHostModelFile, localHostDir,
  249. runtime.ModelHotUpdateContainerPrefix, 1),
  250. "model_update_time": time.Now().String(),
  251. },
  252. }
  253. jsonConfig, err := json.MarshalIndent(config, "", " ")
  254. if err != nil {
  255. return err
  256. }
  257. modelConfigFile := util.AddPrefixPath(im.VolumeMountPrefix, localModelConfigFile)
  258. // overwrite file
  259. err = ioutil.WriteFile(modelConfigFile, jsonConfig, 0644)
  260. if err != nil {
  261. klog.Errorf("job(%s) write model config file(url=%s) failed in deploy phase: %v",
  262. job.JobConfig.UniqueIdentifier, modelConfigFile, err)
  263. return err
  264. }
  265. job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus
  266. klog.V(4).Infof("job(%s) write model config file(url=%s) successfully in deploy phase",
  267. job.JobConfig.UniqueIdentifier, modelConfigFile)
  268. }
  269. return nil
  270. }
  271. // deployTask starts deploy task
  272. func (im *Manager) deployTask(job *Job) {
  273. jobConfig := job.JobConfig
  274. var err error
  275. var neededDeploy bool
  276. neededDeploy, err = im.triggerDeployTask(job)
  277. status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  278. models := im.getModelFromJobConditions(job, sednav1.ILJobDeploy)
  279. if err == nil && neededDeploy && models != nil {
  280. if !job.Spec.DeploySpec.Model.HotUpdateEnabled {
  281. trainedModel := models[0]
  282. deployModel := models[1]
  283. err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL)
  284. if err != nil {
  285. status.Status = string(sednav1.ILJobStageCondFailed)
  286. klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err)
  287. } else {
  288. status.Status = string(sednav1.ILJobStageCondReady)
  289. klog.Infof("update model for job(%s) successfully", jobConfig.UniqueIdentifier)
  290. }
  291. } else {
  292. status.Status = string(sednav1.ILJobStageCondReady)
  293. }
  294. status.Input = &clienttypes.Input{
  295. Models: models,
  296. }
  297. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  298. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  299. } else {
  300. // No need to deploy, just report completed status
  301. // TODO: instead of reporting deploy-completed, another more reasonable status
  302. klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier)
  303. status.Status = string(sednav1.ILJobStageCondCompleted)
  304. }
  305. err = im.Client.WriteMessage(status, job.getHeader())
  306. if err != nil {
  307. klog.Errorf("job(%s) completed the %s task failed: %v",
  308. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  309. }
  310. klog.Infof("job(%s) completed the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  311. }
  312. // startJob starts a job
  313. func (im *Manager) startJob(name string) {
  314. var err error
  315. job := im.IncrementalJobMap[name]
  316. err = im.initJob(job, name)
  317. if err != nil {
  318. klog.Errorf("failed to init job (name=%s): %+v", name)
  319. return
  320. }
  321. klog.Infof("incremental job(%s) was started", name)
  322. defer klog.Infof("incremental learning job(%s) was stopped", name)
  323. // handle data from dataset
  324. go im.handleData(job)
  325. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  326. for {
  327. select {
  328. case <-job.JobConfig.Done:
  329. return
  330. default:
  331. }
  332. cond := im.getLatestCondition(job)
  333. jobStage := cond.Stage
  334. switch jobStage {
  335. case sednav1.ILJobTrain:
  336. err = im.trainTask(job)
  337. case sednav1.ILJobEval:
  338. err = im.evalTask(job)
  339. case sednav1.ILJobDeploy:
  340. err = im.hotModelUpdateDeployTask(job)
  341. default:
  342. klog.Errorf("invalid phase: %s", jobStage)
  343. continue
  344. }
  345. if err != nil {
  346. klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err)
  347. }
  348. <-tick.C
  349. }
  350. }
  351. // Insert inserts incremental-learning-job config to db
  352. func (im *Manager) Insert(message *clienttypes.Message) error {
  353. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  354. first := false
  355. job, ok := im.IncrementalJobMap[name]
  356. if !ok {
  357. job = &Job{}
  358. im.IncrementalJobMap[name] = job
  359. first = true
  360. }
  361. if err := json.Unmarshal(message.Content, &job); err != nil {
  362. return err
  363. }
  364. if first {
  365. go im.startJob(name)
  366. }
  367. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  368. return err
  369. }
  370. return nil
  371. }
  372. // deleteModelHotUpdateData deletes the local data of model hot update
  373. func (im *Manager) deleteModelHotUpdateData(job *Job) error {
  374. if configFile, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  375. localHostDir := filepath.Dir(configFile)
  376. dir := util.AddPrefixPath(im.VolumeMountPrefix, localHostDir)
  377. if err := os.RemoveAll(dir); err != nil {
  378. return fmt.Errorf("failed to delete the dir(%s): %w", dir, err)
  379. }
  380. }
  381. return nil
  382. }
  383. // Delete deletes incremental-learning-job config in db
  384. func (im *Manager) Delete(message *clienttypes.Message) error {
  385. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  386. if job, ok := im.IncrementalJobMap[name]; ok && job.JobConfig.Done != nil {
  387. close(job.JobConfig.Done)
  388. if err := im.deleteModelHotUpdateData(job); err != nil {
  389. klog.Errorf("job(%s) failed to delete data of model hot update: %v", name, err)
  390. // continue anyway
  391. }
  392. }
  393. delete(im.IncrementalJobMap, name)
  394. if err := db.DeleteResource(name); err != nil {
  395. return err
  396. }
  397. return nil
  398. }
  399. // updateJobFromDB updates job from db
  400. func (im *Manager) updateJobFromDB(job *Job) error {
  401. var err error
  402. previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier)
  403. if err != nil {
  404. return err
  405. }
  406. m := metav1.ObjectMeta{}
  407. if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) {
  408. return err
  409. }
  410. rounds, ok := m.Annotations[AnnotationsRoundsKey]
  411. if !ok {
  412. return nil
  413. }
  414. if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil {
  415. return err
  416. }
  417. numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey]
  418. if !ok {
  419. return nil
  420. }
  421. if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil {
  422. return err
  423. }
  424. dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey]
  425. if !ok {
  426. return nil
  427. }
  428. localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "")
  429. if !job.JobConfig.Storage.IsLocalStorage {
  430. defer os.RemoveAll(localURL)
  431. }
  432. if err != nil {
  433. return err
  434. }
  435. samples, err := dataset.GetSamples(dataFileOfEval)
  436. if err != nil {
  437. klog.Errorf("read file %s failed: %v", dataFileOfEval, err)
  438. return err
  439. }
  440. job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples)
  441. return nil
  442. }
  443. // saveJobToDB saves job info to db
  444. func (im *Manager) saveJobToDB(job *Job) error {
  445. ann := job.ObjectMeta.Annotations
  446. if ann == nil {
  447. ann = make(map[string]string)
  448. }
  449. ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds)
  450. ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers)
  451. ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL
  452. return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec)
  453. }
  454. // initJob inits the job object
  455. func (im *Manager) initJob(job *Job, name string) error {
  456. job.JobConfig = new(JobConfig)
  457. jobConfig := job.JobConfig
  458. jobConfig.UniqueIdentifier = name
  459. jobConfig.Storage = storage.Storage{IsLocalStorage: false}
  460. credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  461. if credential != "" {
  462. if err := job.JobConfig.Storage.SetCredential(credential); err != nil {
  463. return fmt.Errorf("failed to set storage credential: %w", err)
  464. }
  465. }
  466. jobConfig.Done = make(chan struct{})
  467. jobConfig.Lock = sync.Mutex{}
  468. jobConfig.Rounds = 0
  469. jobConfig.DataSamples = &DataSamples{
  470. PreviousNumbers: 0,
  471. TrainSamples: make([]string, 0),
  472. EvalVersionSamples: make([][]string, 0),
  473. EvalSamples: make([]string, 0),
  474. }
  475. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  476. if err != nil {
  477. return fmt.Errorf("failed to init train trigger: %+w", err)
  478. }
  479. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  480. if err != nil {
  481. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  482. }
  483. jobConfig.TrainTrigger = trainTrigger
  484. jobConfig.DeployTrigger = deployTrigger
  485. outputDir := job.Spec.OutputDir
  486. isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir)
  487. if err != nil {
  488. return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err)
  489. }
  490. if isLocalURL {
  491. jobConfig.Storage.IsLocalStorage = true
  492. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  493. }
  494. jobConfig.OutputDir = outputDir
  495. if err := job.createOutputDir(jobConfig); err != nil {
  496. return err
  497. }
  498. if err := im.updateJobFromDB(job); err != nil {
  499. klog.Errorf("job(%s) failed to update job from db: %v", name, err)
  500. }
  501. initTriggerStatus(jobConfig)
  502. return nil
  503. }
  504. func initTriggerStatus(jobConfig *JobConfig) {
  505. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  506. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  507. jobConfig.DeployTriggerStatus = TriggerReadyStatus
  508. }
  509. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  510. // convert trigger to map
  511. triggerMap := make(map[string]interface{})
  512. c, err := json.Marshal(t)
  513. if err != nil {
  514. return nil, err
  515. }
  516. err = json.Unmarshal(c, &triggerMap)
  517. if err != nil {
  518. return nil, err
  519. }
  520. return trigger.NewTrigger(triggerMap)
  521. }
  522. // getModelFromJobConditions gets model from job conditions for train/eval/deploy
  523. func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) []Model {
  524. jobConditions := job.Status.Conditions
  525. getModels := func(stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []runtime.Model {
  526. // TODO: runtime.type changes to common.type for gm and lc
  527. for i := len(jobConditions) - 1; i >= 0; i-- {
  528. var cond gmtypes.IncrementalCondData
  529. jobCond := jobConditions[i]
  530. if jobCond.Stage == stage && jobCond.Type == currentType {
  531. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  532. continue
  533. }
  534. if dataType == "input" {
  535. if cond.Input == nil {
  536. continue
  537. }
  538. return cond.Input.Models
  539. } else if dataType == "output" {
  540. if cond.Output == nil {
  541. continue
  542. }
  543. return cond.Output.Models
  544. }
  545. }
  546. }
  547. return nil
  548. }
  549. switch jobStage {
  550. case sednav1.ILJobTrain:
  551. // the second model is the pre-trained model of train stage.
  552. models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
  553. if models != nil {
  554. return []Model{{Format: models[1].Format, URL: models[1].URL}}
  555. }
  556. case sednav1.ILJobEval:
  557. // the first model is the output model of train stage.
  558. models := getModels(sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
  559. if models != nil {
  560. return []Model{{Format: models[0].Format, URL: models[0].URL}}
  561. }
  562. case sednav1.ILJobDeploy:
  563. // two models for deploy stage:
  564. // the first model is the output model of train stage, which was evaluated as better than the second model in eval stage.
  565. // the second model is the serving model used in the inference worker.
  566. var deployModels []Model
  567. models := getModels(sednav1.ILJobEval, sednav1.ILJobStageCondReady, "input")
  568. for _, m := range models {
  569. deployModels = append(deployModels, Model{Format: m.Format, URL: m.URL})
  570. }
  571. return deployModels
  572. }
  573. return nil
  574. }
  575. // triggerTrainTask triggers the train task
  576. func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
  577. var err error
  578. jobConfig := job.JobConfig
  579. const numOfSamples = "num_of_samples"
  580. samples := map[string]interface{}{
  581. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  582. }
  583. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  584. if !isTrigger {
  585. return nil, false, nil
  586. }
  587. job.JobConfig.Rounds++
  588. var m *Model
  589. latestCondition := im.getLatestCondition(job)
  590. rounds := jobConfig.Rounds
  591. if rounds <= 1 {
  592. m = jobConfig.TrainModel
  593. } else {
  594. models := im.getModelFromJobConditions(job, latestCondition.Stage)
  595. if models != nil {
  596. m = &models[0]
  597. }
  598. }
  599. var dataIndexURL string
  600. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  601. jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  602. if err != nil {
  603. job.JobConfig.Rounds--
  604. klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v",
  605. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  606. return nil, false, err
  607. }
  608. dataURL := jobConfig.TrainDataURL
  609. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  610. if jobConfig.Storage.IsLocalStorage {
  611. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  612. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  613. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  614. }
  615. input := clienttypes.Input{
  616. Models: []Model{*m},
  617. DataURL: dataURL,
  618. DataIndexURL: dataIndexURL,
  619. OutputDir: outputDir,
  620. }
  621. msg := clienttypes.UpstreamMessage{
  622. Phase: string(sednav1.ILJobTrain),
  623. Status: string(sednav1.ILJobStageCondReady),
  624. Input: &input,
  625. }
  626. jobConfig.TriggerTime = time.Now()
  627. return &msg, true, nil
  628. }
  629. // triggerEvalTask triggers the eval task
  630. func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) {
  631. jobConfig := job.JobConfig
  632. var err error
  633. latestCondition := im.getLatestCondition(job)
  634. ms := im.getModelFromJobConditions(job, latestCondition.Stage)
  635. if ms == nil {
  636. return nil, err
  637. }
  638. models := []Model{ms[0], {
  639. Format: jobConfig.DeployModel.Format,
  640. URL: jobConfig.DeployModel.URL,
  641. }}
  642. // EvalModels has two models, first is trained model, second is deployed model
  643. jobConfig.EvalModels = models
  644. var dataIndexURL string
  645. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  646. job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  647. if err != nil {
  648. klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v",
  649. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  650. return nil, err
  651. }
  652. dataURL := jobConfig.EvalDataURL
  653. if jobConfig.Storage.IsLocalStorage {
  654. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  655. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  656. }
  657. input := clienttypes.Input{
  658. Models: models,
  659. DataURL: dataURL,
  660. DataIndexURL: dataIndexURL,
  661. }
  662. msg := &clienttypes.UpstreamMessage{
  663. Phase: string(sednav1.ILJobEval),
  664. Status: string(sednav1.ILJobStageCondReady),
  665. Input: &input,
  666. }
  667. return msg, nil
  668. }
  669. // triggerDeployTask triggers the deploy task
  670. func (im *Manager) triggerDeployTask(job *Job) (bool, error) {
  671. jobConfig := job.JobConfig
  672. // EvalResult must has two models info, first is trained model, second is deployed model.
  673. if len(jobConfig.EvalResult) != 2 {
  674. return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult))
  675. }
  676. getMetrics := func(metrics map[string]interface{}) (map[string][]float64, error) {
  677. var err error
  678. bytes, err := json.Marshal(metrics)
  679. if err != nil {
  680. return nil, err
  681. }
  682. data := make(map[string][]float64)
  683. if err := json.Unmarshal(bytes, &data); err != nil {
  684. return nil, err
  685. }
  686. return data, err
  687. }
  688. newMetrics, err := getMetrics(jobConfig.EvalResult[0].Metrics)
  689. if err != nil {
  690. return false, err
  691. }
  692. oldMetrics, err := getMetrics(jobConfig.EvalResult[1].Metrics)
  693. if err != nil {
  694. return false, err
  695. }
  696. metricDelta := make(map[string]interface{})
  697. for metric := range newMetrics {
  698. // keep the full metrics
  699. metricDelta[metric] = newMetrics[metric]
  700. var l []float64
  701. for i := range newMetrics[metric] {
  702. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  703. }
  704. metricDelta[metric+"_delta"] = l
  705. }
  706. tt := job.Spec.DeploySpec.Trigger
  707. // convert tt to map
  708. triggerMap := make(map[string]interface{})
  709. c, err := json.Marshal(tt)
  710. if err != nil {
  711. return false, err
  712. }
  713. err = json.Unmarshal(c, &triggerMap)
  714. if err != nil {
  715. return false, err
  716. }
  717. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  718. }
  719. // updateDeployModelFile updates deploy model file
  720. func (im *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error {
  721. if job.JobConfig.Storage.IsLocalStorage {
  722. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  723. }
  724. if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil {
  725. return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w",
  726. trainedModel, deployModel, err)
  727. }
  728. klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel)
  729. return nil
  730. }
  731. // createOutputDir creates the job output dir
  732. func (job *Job) createOutputDir(jobConfig *JobConfig) error {
  733. outputDir := jobConfig.OutputDir
  734. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  735. if job.JobConfig.Storage.IsLocalStorage {
  736. if err := util.CreateFolder(outputDir); err != nil {
  737. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err)
  738. return err
  739. }
  740. for _, v := range dirNames {
  741. dir := path.Join(outputDir, v)
  742. if err := util.CreateFolder(dir); err != nil {
  743. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err)
  744. return err
  745. }
  746. }
  747. }
  748. outputConfig := OutputConfig{
  749. SamplesOutput: map[string]string{
  750. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  751. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  752. },
  753. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  754. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  755. }
  756. jobConfig.OutputConfig = &outputConfig
  757. return nil
  758. }
  759. func (im *Manager) getLatestCondition(job *Job) sednav1.ILJobCondition {
  760. jobConditions := job.Status.Conditions
  761. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  762. if len(jobConditions) > 0 {
  763. // get latest pod and pod status
  764. latestCondition = jobConditions[len(jobConditions)-1]
  765. }
  766. return latestCondition
  767. }
  768. func (im *Manager) getModel(namespace string, name string) (sednav1.Model, error) {
  769. modelName := util.GetUniqueIdentifier(namespace, name, model.KindName)
  770. model, ok := im.ModelManager.GetModel(modelName)
  771. if !ok {
  772. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  773. }
  774. return model, nil
  775. }
  776. // loadTrainModel loads initial model information for training.
  777. func (im *Manager) loadTrainModel(job *Job) error {
  778. jobConfig := job.JobConfig
  779. if jobConfig.TrainModel == nil {
  780. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  781. if err != nil {
  782. return err
  783. }
  784. jobConfig.TrainModel = new(Model)
  785. format := initialModel.Spec.Format
  786. url := initialModel.Spec.URL
  787. jobConfig.TrainModel.Format = format
  788. jobConfig.TrainModel.URL = url
  789. }
  790. return nil
  791. }
  792. // loadDeployModel loads model information for deploying.
  793. func (im *Manager) loadDeployModel(job *Job) error {
  794. jobConfig := job.JobConfig
  795. if jobConfig.DeployModel == nil {
  796. evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  797. if err != nil {
  798. return err
  799. }
  800. jobConfig.DeployModel = new(Model)
  801. jobConfig.DeployModel.Format = evalModel.Spec.Format
  802. jobConfig.DeployModel.URL = evalModel.Spec.URL
  803. }
  804. return nil
  805. }
  806. // loadDataset loads dataset information
  807. func (im *Manager) loadDataset(job *Job) error {
  808. if job.JobConfig.Dataset != nil {
  809. // already loaded
  810. return nil
  811. }
  812. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName)
  813. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  814. if !ok || dataset == nil {
  815. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  816. }
  817. job.JobConfig.Dataset = dataset
  818. return nil
  819. }
  820. // handleData updates samples information
  821. func (im *Manager) handleData(job *Job) {
  822. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  823. jobConfig := job.JobConfig
  824. iterCount := 0
  825. for {
  826. select {
  827. case <-jobConfig.Done:
  828. return
  829. default:
  830. }
  831. if iterCount%100 == 0 {
  832. klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier)
  833. }
  834. iterCount++
  835. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  836. // already loaded dataset
  837. <-tick.C
  838. continue
  839. }
  840. dataset := jobConfig.Dataset
  841. currentNumberOfSamples := dataset.DataSource.NumberOfSamples
  842. previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers
  843. if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples {
  844. samples := dataset.DataSource.TrainSamples
  845. newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples
  846. trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples))
  847. jobConfig.Lock.Lock()
  848. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  849. samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...)
  850. klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum)
  851. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  852. samples[previousNumberOfSamples+trainNum:])
  853. jobConfig.Lock.Unlock()
  854. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  855. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  856. }
  857. klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  858. jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples
  859. }
  860. <-tick.C
  861. }
  862. }
  863. // createFile creates data file and data index file
  864. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  865. switch format {
  866. case dataset.TXTFormat:
  867. if isLocalStorage {
  868. return path.Join(dir, "data.txt"), ""
  869. }
  870. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  871. }
  872. return "", ""
  873. }
  874. // writeSamples writes samples information to a file
  875. func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  876. if samples == nil {
  877. return "", "", fmt.Errorf("not samples")
  878. }
  879. jobConfig := job.JobConfig
  880. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  881. fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  882. if jobConfig.Storage.IsLocalStorage {
  883. if err := util.CreateFolder(subDir); err != nil {
  884. return "", "", err
  885. }
  886. if err := im.writeByLine(samples, fileURL); err != nil {
  887. return "", "", err
  888. }
  889. if !jobConfig.Dataset.Storage.IsLocalStorage {
  890. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  891. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  892. return "", "", err
  893. }
  894. }
  895. return fileURL, absURLFile, nil
  896. }
  897. temporaryDir, err := util.CreateTemporaryDir()
  898. if err != nil {
  899. return "", "", err
  900. }
  901. localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  902. if err := im.writeByLine(samples, localFileURL); err != nil {
  903. return "", "", err
  904. }
  905. if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil {
  906. return "", "", err
  907. }
  908. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  909. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  910. return "", "", err
  911. }
  912. if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  913. return "", "", err
  914. }
  915. defer os.RemoveAll(localFileURL)
  916. defer os.RemoveAll(localAbsURLFile)
  917. return fileURL, absURLFile, nil
  918. }
  919. // writeByLine writes file by line
  920. func (im *Manager) writeByLine(samples []string, fileURL string) error {
  921. file, err := os.Create(fileURL)
  922. if err != nil {
  923. klog.Errorf("create file(%s) failed", fileURL)
  924. return err
  925. }
  926. w := bufio.NewWriter(file)
  927. for _, line := range samples {
  928. _, _ = fmt.Fprintln(w, line)
  929. }
  930. if err := w.Flush(); err != nil {
  931. klog.Errorf("failed to write file(%s): %v", fileURL, err)
  932. return err
  933. }
  934. if err := file.Close(); err != nil {
  935. klog.Errorf("failed to close file(%s): %v", fileURL, err)
  936. return err
  937. }
  938. return nil
  939. }
  940. // monitorWorker monitors message from worker
  941. func (im *Manager) monitorWorker() {
  942. for {
  943. workerMessageChannel := im.WorkerMessageChannel
  944. workerMessage, ok := <-workerMessageChannel
  945. if !ok {
  946. break
  947. }
  948. klog.V(4).Infof("handling worker message %+v", workerMessage)
  949. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  950. job, ok := im.IncrementalJobMap[name]
  951. if !ok {
  952. continue
  953. }
  954. // TODO: filter some worker messages out
  955. wo := clienttypes.Output{}
  956. wo.Models = workerMessage.Results
  957. wo.OwnerInfo = workerMessage.OwnerInfo
  958. msg := &clienttypes.UpstreamMessage{
  959. Phase: workerMessage.Kind,
  960. Status: workerMessage.Status,
  961. Output: &wo,
  962. }
  963. if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil {
  964. klog.Errorf("job(%s) failed to write message: %v", name, err)
  965. continue
  966. }
  967. im.handleWorkerMessage(job, workerMessage)
  968. }
  969. }
  970. // handleWorkerMessage handles message from worker
  971. func (im *Manager) handleWorkerMessage(job *Job, workerMessage workertypes.MessageContent) {
  972. latestCond := im.getLatestCondition(job)
  973. jobStage := strings.ToLower(string(latestCond.Stage))
  974. workerKind := strings.ToLower(workerMessage.Kind)
  975. if jobStage != workerKind {
  976. klog.Warningf("job(%s)'s %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
  977. jobStage, workerKind)
  978. return
  979. }
  980. var models []Model
  981. for _, result := range workerMessage.Results {
  982. metrics := make(map[string]interface{})
  983. if m, ok := result["metrics"]; ok {
  984. bytes, err := json.Marshal(m)
  985. if err != nil {
  986. return
  987. }
  988. err = json.Unmarshal(bytes, &metrics)
  989. if err != nil {
  990. klog.Warningf("failed to unmarshal the worker(name=%s) metrics %v, err: %v",
  991. workerMessage.Name,
  992. m,
  993. err)
  994. }
  995. }
  996. model := Model{
  997. Format: result["format"].(string),
  998. URL: result["url"].(string),
  999. Metrics: metrics}
  1000. models = append(models, model)
  1001. }
  1002. workerStatus := workerMessage.Status
  1003. jobName := job.JobConfig.UniqueIdentifier
  1004. if workerStatus == workertypes.CompletedStatus {
  1005. klog.Infof("job(%s) completed the %s task successfully", jobName, jobStage)
  1006. switch latestCond.Stage {
  1007. case sednav1.ILJobEval:
  1008. job.JobConfig.EvalResult = models
  1009. // when eval worker is completed status, the deploy task will starts immediately without waiting for the notification of GM.
  1010. im.deployTask(job)
  1011. }
  1012. }
  1013. }
  1014. // forwardSamples deletes the samples information in the memory
  1015. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  1016. switch jobStage {
  1017. case sednav1.ILJobTrain:
  1018. jobConfig.Lock.Lock()
  1019. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  1020. jobConfig.Lock.Unlock()
  1021. case sednav1.ILJobEval:
  1022. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  1023. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  1024. }
  1025. }
  1026. }
  1027. // AddWorkerMessage adds worker messages
  1028. func (im *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  1029. im.WorkerMessageChannel <- message
  1030. }
  1031. // GetName returns name of the manager
  1032. func (im *Manager) GetName() string {
  1033. return KindName
  1034. }
  1035. func (job *Job) getHeader() clienttypes.MessageHeader {
  1036. return clienttypes.MessageHeader{
  1037. Namespace: job.Namespace,
  1038. ResourceKind: job.Kind,
  1039. ResourceName: job.Name,
  1040. Operation: clienttypes.StatusOperation,
  1041. }
  1042. }