You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 38 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "strconv"
  23. "strings"
  24. "sync"
  25. "time"
  26. "k8s.io/klog/v2"
  27. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  28. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  29. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  30. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  32. clienttypes "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/managers/dataset"
  34. "github.com/kubeedge/sedna/pkg/localcontroller/managers/model"
  35. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  36. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  37. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  38. workertypes "github.com/kubeedge/sedna/pkg/localcontroller/worker"
  39. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  40. )
  41. // IncrementalLearningJob defines config for incremental-learning-job
  42. type Job struct {
  43. sednav1.IncrementalLearningJob
  44. JobConfig *JobConfig
  45. }
  46. // JobConfig defines config for incremental-learning-job
  47. type JobConfig struct {
  48. UniqueIdentifier string
  49. Rounds int
  50. TrainTrigger trigger.Base
  51. DeployTrigger trigger.Base
  52. TriggerTime time.Time
  53. TrainTriggerStatus string
  54. EvalTriggerStatus string
  55. DeployTriggerStatus string
  56. HotModelUpdateDeployTriggerStatus string
  57. TrainDataURL string
  58. EvalDataURL string
  59. OutputDir string
  60. OutputConfig *OutputConfig
  61. DataSamples *DataSamples
  62. TrainModel *Model
  63. DeployModel *Model
  64. EvalModel *Model
  65. EvalResult []Model
  66. Lock sync.Mutex
  67. Dataset *dataset.Dataset
  68. Storage storage.Storage
  69. Done chan struct{}
  70. }
  71. type Model = clienttypes.Model
  72. // OutputConfig defines config for job output
  73. type OutputConfig struct {
  74. SamplesOutput map[string]string `json:"trainData"`
  75. TrainOutput string `json:"trainOutput"`
  76. EvalOutput string `json:"evalOutput"`
  77. }
  78. // DataSamples defines samples information
  79. type DataSamples struct {
  80. PreviousNumbers int
  81. TrainSamples []string
  82. EvalVersionSamples [][]string
  83. EvalSamples []string
  84. }
  85. // IncrementalLearningJob defines incremental-learning-job manager
  86. type Manager struct {
  87. Client clienttypes.ClientI
  88. WorkerMessageChannel chan workertypes.MessageContent
  89. DatasetManager *dataset.Manager
  90. ModelManager *model.Manager
  91. IncrementalJobMap map[string]*Job
  92. VolumeMountPrefix string
  93. }
  94. const (
  95. // JobIterationIntervalSeconds is interval time of each iteration of job
  96. JobIterationIntervalSeconds = 10
  97. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  98. DatasetHandlerIntervalSeconds = 10
  99. // EvalSamplesCapacity is capacity of eval samples
  100. EvalSamplesCapacity = 5
  101. //KindName is kind of incremental-learning-job resource
  102. KindName = "incrementallearningjob"
  103. // TriggerReadyStatus is the ready status about trigger
  104. TriggerReadyStatus = "ready"
  105. // TriggerCompletedStatus is the completed status about trigger
  106. TriggerCompletedStatus = "completed"
  107. AnnotationsRoundsKey = "sedna.io/rounds"
  108. AnnotationsNumberOfSamplesKey = "sedna.io/number-of-samples"
  109. AnnotationsDataFileOfEvalKey = "sedna.io/data-file-of-eval"
  110. )
  111. // New creates a incremental-learning-job manager
  112. func New(client clienttypes.ClientI, datasetManager *dataset.Manager,
  113. modelManager *model.Manager, options *options.LocalControllerOptions) *Manager {
  114. im := Manager{
  115. Client: client,
  116. WorkerMessageChannel: make(chan workertypes.MessageContent, workertypes.MessageChannelCacheSize),
  117. DatasetManager: datasetManager,
  118. ModelManager: modelManager,
  119. IncrementalJobMap: make(map[string]*Job),
  120. VolumeMountPrefix: options.VolumeMountPrefix,
  121. }
  122. return &im
  123. }
  124. // Start starts incremental-learning-job manager
  125. func (im *Manager) Start() error {
  126. go im.monitorWorker()
  127. return nil
  128. }
  129. // trainTask starts training task
  130. func (im *Manager) trainTask(job *Job) error {
  131. jobConfig := job.JobConfig
  132. latestCond := im.getLatestCondition(job)
  133. jobStage := latestCond.Stage
  134. currentType := latestCond.Type
  135. if currentType == sednav1.ILJobStageCondWaiting {
  136. var err error
  137. err = im.loadDataset(job)
  138. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  139. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  140. jobConfig.UniqueIdentifier, err)
  141. }
  142. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  143. return fmt.Errorf("job(%s)'s dataset not ready", jobConfig.UniqueIdentifier)
  144. }
  145. err = im.loadTrainModel(job)
  146. if err != nil {
  147. return fmt.Errorf("failed to sync train model, and waiting it: %w", err)
  148. }
  149. initTriggerStatus(jobConfig)
  150. if jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  151. payload, ok, err := im.triggerTrainTask(job)
  152. if !ok {
  153. return nil
  154. }
  155. if err != nil {
  156. klog.Errorf("job(%s) failed to complete the %sing phase triggering task: %v",
  157. jobConfig.UniqueIdentifier, jobStage, err)
  158. job.JobConfig.Rounds--
  159. return err
  160. }
  161. err = im.Client.WriteMessage(payload, job.getHeader())
  162. if err != nil {
  163. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  164. job.JobConfig.Rounds--
  165. return err
  166. }
  167. forwardSamples(jobConfig, jobStage)
  168. err = im.saveJobToDB(job)
  169. if err != nil {
  170. klog.Errorf("job(%s) failed to save job to db: %v",
  171. jobConfig.UniqueIdentifier, err)
  172. // continue anyway
  173. }
  174. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  175. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  176. jobConfig.UniqueIdentifier, jobStage)
  177. }
  178. }
  179. return nil
  180. }
  181. // evalTask starts eval task
  182. func (im *Manager) evalTask(job *Job) error {
  183. jobConfig := job.JobConfig
  184. latestCond := im.getLatestCondition(job)
  185. jobStage := latestCond.Stage
  186. currentType := latestCond.Type
  187. if currentType == sednav1.ILJobStageCondWaiting {
  188. var err error
  189. err = im.loadDataset(job)
  190. if err != nil || jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  191. return fmt.Errorf("job(%s) failed to load dataset, and waiting it: %w",
  192. jobConfig.UniqueIdentifier, err)
  193. }
  194. err = im.loadDeployModel(job)
  195. if err != nil {
  196. return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
  197. }
  198. if job.Spec.EvalSpec.InitialModel != nil {
  199. err = im.loadEvalModel(job)
  200. if err != nil {
  201. return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err)
  202. }
  203. } else {
  204. if jobConfig.EvalModel == nil {
  205. jobConfig.EvalModel = jobConfig.DeployModel
  206. }
  207. }
  208. if jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  209. payload, err := im.triggerEvalTask(job)
  210. if err != nil {
  211. klog.Errorf("job(%s) completed the %sing phase triggering task failed: %v",
  212. jobConfig.UniqueIdentifier, jobStage, err)
  213. return err
  214. }
  215. err = im.Client.WriteMessage(payload, job.getHeader())
  216. if err != nil {
  217. klog.Errorf("job(%s) failed to write message: %v", jobConfig.UniqueIdentifier, err)
  218. return err
  219. }
  220. forwardSamples(jobConfig, jobStage)
  221. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  222. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  223. jobConfig.UniqueIdentifier, jobStage)
  224. }
  225. }
  226. return nil
  227. }
  228. // hotModelUpdateDeployTask starts deploy task when job supports hot model update
  229. func (im *Manager) hotModelUpdateDeployTask(job *Job) error {
  230. var localModelConfigFile string
  231. if v, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  232. localModelConfigFile = v
  233. } else {
  234. return nil
  235. }
  236. if job.JobConfig.HotModelUpdateDeployTriggerStatus == TriggerReadyStatus {
  237. var err error
  238. err = im.loadDeployModel(job)
  239. if err != nil {
  240. return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
  241. }
  242. if job.Spec.EvalSpec.InitialModel != nil {
  243. err = im.loadEvalModel(job)
  244. if err != nil {
  245. return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err)
  246. }
  247. }
  248. trainedModel := im.getModelFromJobConditions(job, sednav1.ILJobDeploy)
  249. deployModel := job.JobConfig.DeployModel
  250. trainedModelURL := trainedModel.URL
  251. modelName := filepath.Base(trainedModelURL)
  252. localHostDir := filepath.Dir(localModelConfigFile)
  253. localHostModelFile := filepath.Join(localHostDir, modelName)
  254. modelFile := util.AddPrefixPath(im.VolumeMountPrefix, localHostModelFile)
  255. if err := im.updateDeployModelFile(job, trainedModelURL, modelFile); err != nil {
  256. return err
  257. }
  258. deployModelURL := deployModel.URL
  259. if err := im.updateDeployModelFile(job, trainedModelURL, deployModelURL); err != nil {
  260. return err
  261. }
  262. evalModel := job.JobConfig.EvalModel
  263. if evalModel != nil {
  264. newEvalModel := im.getModelFromJobConditions(job, sednav1.ILJobEval)
  265. if err := im.updateDeployModelFile(job, newEvalModel.URL, evalModel.URL); err != nil {
  266. return err
  267. }
  268. }
  269. config := map[string]map[string]string{
  270. "model_config": {
  271. "model_path": strings.Replace(localHostModelFile, localHostDir,
  272. runtime.ModelHotUpdateContainerPrefix, 1),
  273. "model_update_time": time.Now().String(),
  274. },
  275. }
  276. jsonConfig, err := json.MarshalIndent(config, "", " ")
  277. if err != nil {
  278. return err
  279. }
  280. modelConfigFile := util.AddPrefixPath(im.VolumeMountPrefix, localModelConfigFile)
  281. // overwrite file
  282. err = ioutil.WriteFile(modelConfigFile, jsonConfig, 0644)
  283. if err != nil {
  284. klog.Errorf("job(%s) write model config file(url=%s) failed in deploy phase: %v",
  285. job.JobConfig.UniqueIdentifier, modelConfigFile, err)
  286. return err
  287. }
  288. job.JobConfig.HotModelUpdateDeployTriggerStatus = TriggerCompletedStatus
  289. klog.V(4).Infof("job(%s) write model config file(url=%s) successfully in deploy phase",
  290. job.JobConfig.UniqueIdentifier, modelConfigFile)
  291. klog.Infof("job(%s) completed the %s task successfully", job.JobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  292. }
  293. return nil
  294. }
  295. // deployTask starts deploy task
  296. func (im *Manager) deployTask(job *Job) error {
  297. if job.JobConfig.DeployTriggerStatus == TriggerReadyStatus {
  298. if err := im.loadDeployModel(job); err != nil {
  299. return fmt.Errorf("failed to sync deploy model, and waiting it: %w", err)
  300. }
  301. if !job.Spec.DeploySpec.Model.HotUpdateEnabled && job.Spec.EvalSpec.InitialModel != nil {
  302. err := im.loadEvalModel(job)
  303. if err != nil {
  304. return fmt.Errorf("failed to sync initial eval model, and waiting it: %w", err)
  305. }
  306. }
  307. jobConfig := job.JobConfig
  308. var err error
  309. var neededDeploy bool
  310. neededDeploy, err = im.triggerDeployTask(job)
  311. status := clienttypes.UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  312. if err == nil && neededDeploy {
  313. var models []Model
  314. trainedModel := im.getModelFromJobConditions(job, sednav1.ILJobDeploy)
  315. deployModel := jobConfig.DeployModel
  316. models = append(models, *trainedModel, *deployModel)
  317. if !job.Spec.DeploySpec.Model.HotUpdateEnabled {
  318. err = im.updateDeployModelFile(job, trainedModel.URL, deployModel.URL)
  319. if err != nil {
  320. status.Status = string(sednav1.ILJobStageCondFailed)
  321. klog.Errorf("failed to update model for job(%s): %v", jobConfig.UniqueIdentifier, err)
  322. } else {
  323. status.Status = string(sednav1.ILJobStageCondReady)
  324. klog.Infof("update model for job(%s) successfully", jobConfig.UniqueIdentifier)
  325. }
  326. evalModel := job.JobConfig.EvalModel
  327. if evalModel != nil {
  328. newEvalModel := im.getModelFromJobConditions(job, sednav1.ILJobEval)
  329. if err := im.updateDeployModelFile(job, newEvalModel.URL, evalModel.URL); err != nil {
  330. return err
  331. }
  332. }
  333. } else {
  334. status.Status = string(sednav1.ILJobStageCondReady)
  335. }
  336. status.Input = &clienttypes.Input{
  337. Models: models,
  338. }
  339. klog.Infof("job(%s) completed the %sing phase triggering task successfully",
  340. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  341. } else {
  342. // No need to deploy, just report completed status
  343. // TODO: instead of reporting deploy-completed, another more reasonable status
  344. klog.Infof("job(%s) isn't need to deploy model", jobConfig.UniqueIdentifier)
  345. status.Status = string(sednav1.ILJobStageCondCompleted)
  346. }
  347. err = im.Client.WriteMessage(status, job.getHeader())
  348. if err != nil {
  349. klog.Errorf("job(%s) completed the %s task failed: %v",
  350. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  351. }
  352. job.JobConfig.DeployTriggerStatus = TriggerCompletedStatus
  353. }
  354. return nil
  355. }
  356. // startJob starts a job
  357. func (im *Manager) startJob(name string) {
  358. var err error
  359. job := im.IncrementalJobMap[name]
  360. err = im.initJob(job, name)
  361. if err != nil {
  362. klog.Errorf("failed to init job (name=%s): %+v", name)
  363. return
  364. }
  365. klog.Infof("incremental job(%s) was started", name)
  366. defer klog.Infof("incremental learning job(%s) was stopped", name)
  367. // handle data from dataset
  368. go im.handleData(job)
  369. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  370. for {
  371. select {
  372. case <-job.JobConfig.Done:
  373. return
  374. default:
  375. }
  376. cond := im.getLatestCondition(job)
  377. jobStage := cond.Stage
  378. switch jobStage {
  379. case sednav1.ILJobTrain:
  380. err = im.trainTask(job)
  381. case sednav1.ILJobEval:
  382. err = im.evalTask(job)
  383. case sednav1.ILJobDeploy:
  384. if cond.Type == sednav1.ILJobStageCondWaiting {
  385. err = im.deployTask(job)
  386. } else if cond.Type == sednav1.ILJobStageCondRunning && job.Spec.DeploySpec.Model.HotUpdateEnabled {
  387. err = im.hotModelUpdateDeployTask(job)
  388. }
  389. default:
  390. klog.Errorf("invalid phase: %s", jobStage)
  391. continue
  392. }
  393. if err != nil {
  394. klog.Errorf("job(%s) failed to complete the %s task: %v", name, jobStage, err)
  395. }
  396. <-tick.C
  397. }
  398. }
  399. // Insert inserts incremental-learning-job config to db
  400. func (im *Manager) Insert(message *clienttypes.Message) error {
  401. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  402. first := false
  403. job, ok := im.IncrementalJobMap[name]
  404. if !ok {
  405. job = &Job{}
  406. im.IncrementalJobMap[name] = job
  407. first = true
  408. }
  409. if err := json.Unmarshal(message.Content, &job); err != nil {
  410. return err
  411. }
  412. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  413. return err
  414. }
  415. if first {
  416. go im.startJob(name)
  417. }
  418. return nil
  419. }
  420. // deleteModelHotUpdateData deletes the local data of model hot update
  421. func (im *Manager) deleteModelHotUpdateData(job *Job) error {
  422. if configFile, ok := job.ObjectMeta.Annotations[runtime.ModelHotUpdateAnnotationsKey]; ok {
  423. localHostDir := filepath.Dir(configFile)
  424. dir := util.AddPrefixPath(im.VolumeMountPrefix, localHostDir)
  425. if err := os.RemoveAll(dir); err != nil {
  426. return fmt.Errorf("failed to delete the dir(%s): %w", dir, err)
  427. }
  428. }
  429. return nil
  430. }
  431. // Delete deletes incremental-learning-job config in db
  432. func (im *Manager) Delete(message *clienttypes.Message) error {
  433. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  434. if job, ok := im.IncrementalJobMap[name]; ok && job.JobConfig.Done != nil {
  435. close(job.JobConfig.Done)
  436. if err := im.deleteModelHotUpdateData(job); err != nil {
  437. klog.Errorf("job(%s) failed to delete data of model hot update: %v", name, err)
  438. // continue anyway
  439. }
  440. }
  441. delete(im.IncrementalJobMap, name)
  442. if err := db.DeleteResource(name); err != nil {
  443. return err
  444. }
  445. return nil
  446. }
  447. // updateJobFromDB updates job from db
  448. func (im *Manager) updateJobFromDB(job *Job) error {
  449. var err error
  450. previousJob, err := db.GetResource(job.JobConfig.UniqueIdentifier)
  451. if err != nil {
  452. return err
  453. }
  454. m := metav1.ObjectMeta{}
  455. if err != json.Unmarshal([]byte(previousJob.ObjectMeta), &m) {
  456. return err
  457. }
  458. rounds, ok := m.Annotations[AnnotationsRoundsKey]
  459. if !ok {
  460. return nil
  461. }
  462. if job.JobConfig.Rounds, err = strconv.Atoi(rounds); err != nil {
  463. return err
  464. }
  465. numberOfSamples, ok := m.Annotations[AnnotationsNumberOfSamplesKey]
  466. if !ok {
  467. return nil
  468. }
  469. if job.JobConfig.DataSamples.PreviousNumbers, err = strconv.Atoi(numberOfSamples); err != nil {
  470. return err
  471. }
  472. dataFileOfEval, ok := m.Annotations[AnnotationsDataFileOfEvalKey]
  473. if !ok {
  474. return nil
  475. }
  476. localURL, err := job.JobConfig.Storage.Download(dataFileOfEval, "")
  477. if !job.JobConfig.Storage.IsLocalStorage {
  478. defer os.RemoveAll(localURL)
  479. }
  480. if err != nil {
  481. return err
  482. }
  483. samples, err := dataset.GetSamples(dataFileOfEval)
  484. if err != nil {
  485. klog.Errorf("read file %s failed: %v", dataFileOfEval, err)
  486. return err
  487. }
  488. job.JobConfig.DataSamples.EvalVersionSamples = append(job.JobConfig.DataSamples.EvalVersionSamples, samples)
  489. return nil
  490. }
  491. // saveJobToDB saves job info to db
  492. func (im *Manager) saveJobToDB(job *Job) error {
  493. ann := job.ObjectMeta.Annotations
  494. if ann == nil {
  495. ann = make(map[string]string)
  496. }
  497. ann[AnnotationsRoundsKey] = strconv.Itoa(job.JobConfig.Rounds)
  498. ann[AnnotationsNumberOfSamplesKey] = strconv.Itoa(job.JobConfig.DataSamples.PreviousNumbers)
  499. ann[AnnotationsDataFileOfEvalKey] = job.JobConfig.EvalDataURL
  500. return db.SaveResource(job.JobConfig.UniqueIdentifier, job.TypeMeta, job.ObjectMeta, job.Spec)
  501. }
  502. // initJob inits the job object
  503. func (im *Manager) initJob(job *Job, name string) error {
  504. job.JobConfig = new(JobConfig)
  505. jobConfig := job.JobConfig
  506. jobConfig.UniqueIdentifier = name
  507. jobConfig.Storage = storage.Storage{IsLocalStorage: false}
  508. credential := job.ObjectMeta.Annotations[runtime.SecretAnnotationKey]
  509. if credential != "" {
  510. if err := job.JobConfig.Storage.SetCredential(credential); err != nil {
  511. return fmt.Errorf("failed to set storage credential: %w", err)
  512. }
  513. }
  514. jobConfig.Done = make(chan struct{})
  515. jobConfig.Lock = sync.Mutex{}
  516. jobConfig.Rounds = 0
  517. jobConfig.DataSamples = &DataSamples{
  518. PreviousNumbers: 0,
  519. TrainSamples: make([]string, 0),
  520. EvalVersionSamples: make([][]string, 0),
  521. EvalSamples: make([]string, 0),
  522. }
  523. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  524. if err != nil {
  525. return fmt.Errorf("failed to init train trigger: %+w", err)
  526. }
  527. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  528. if err != nil {
  529. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  530. }
  531. jobConfig.TrainTrigger = trainTrigger
  532. jobConfig.DeployTrigger = deployTrigger
  533. outputDir := job.Spec.OutputDir
  534. isLocalURL, err := jobConfig.Storage.IsLocalURL(outputDir)
  535. if err != nil {
  536. return fmt.Errorf("job(%s)'s output dir(%s) is invalid: %+w", job.Name, outputDir, err)
  537. }
  538. if isLocalURL {
  539. jobConfig.Storage.IsLocalStorage = true
  540. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  541. }
  542. jobConfig.OutputDir = outputDir
  543. if err := job.createOutputDir(jobConfig); err != nil {
  544. return err
  545. }
  546. if err := im.updateJobFromDB(job); err != nil {
  547. klog.Errorf("job(%s) failed to update job from db: %v", name, err)
  548. }
  549. initTriggerStatus(jobConfig)
  550. return nil
  551. }
  552. func initTriggerStatus(jobConfig *JobConfig) {
  553. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  554. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  555. jobConfig.DeployTriggerStatus = TriggerReadyStatus
  556. jobConfig.HotModelUpdateDeployTriggerStatus = TriggerReadyStatus
  557. }
  558. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  559. // convert trigger to map
  560. triggerMap := make(map[string]interface{})
  561. c, err := json.Marshal(t)
  562. if err != nil {
  563. return nil, err
  564. }
  565. err = json.Unmarshal(c, &triggerMap)
  566. if err != nil {
  567. return nil, err
  568. }
  569. return trigger.NewTrigger(triggerMap)
  570. }
  571. // getModelsFromJobConditions gets models from job condition
  572. func (im *Manager) getModelsFromJobConditions(jobConditions []sednav1.ILJobCondition, stage sednav1.ILJobStage, currentType sednav1.ILJobStageConditionType, dataType string) []Model {
  573. // TODO: runtime.type changes to common.type for gm and lc
  574. for i := len(jobConditions) - 1; i >= 0; i-- {
  575. var cond gmtypes.IncrementalCondData
  576. jobCond := jobConditions[i]
  577. if jobCond.Stage == stage && jobCond.Type == currentType {
  578. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  579. continue
  580. }
  581. if dataType == "input" {
  582. if cond.Input == nil {
  583. continue
  584. }
  585. return cond.Input.Models
  586. } else if dataType == "output" {
  587. if cond.Output == nil {
  588. continue
  589. }
  590. return cond.Output.Models
  591. }
  592. }
  593. }
  594. return nil
  595. }
  596. // getEvalResult gets eval result from job conditions
  597. func (im *Manager) getEvalResult(job *Job) ([]map[string][]float64, error) {
  598. jobConditions := job.Status.Conditions
  599. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobEval, sednav1.ILJobStageCondCompleted, "output")
  600. var result []map[string][]float64
  601. var err error
  602. for _, m := range models {
  603. bytes, err := json.Marshal(m.Metrics)
  604. if err != nil {
  605. return nil, err
  606. }
  607. data := make(map[string][]float64)
  608. if err = json.Unmarshal(bytes, &data); err != nil {
  609. return nil, err
  610. }
  611. result = append(result, data)
  612. }
  613. return result, err
  614. }
  615. // getModelFromJobConditions gets model from job conditions for train/eval/deploy
  616. func (im *Manager) getModelFromJobConditions(job *Job, jobStage sednav1.ILJobStage) *Model {
  617. jobConditions := job.Status.Conditions
  618. jobConfig := job.JobConfig
  619. getModel := func(initModel *Model, models []Model) *Model {
  620. for _, m := range models {
  621. if m.Format == initModel.Format {
  622. if initModel.Devices != nil && len(m.Devices) == 1 {
  623. for _, d := range initModel.Devices {
  624. if m.Devices[0] == d {
  625. return &m
  626. }
  627. }
  628. } else {
  629. return &m
  630. }
  631. }
  632. }
  633. return nil
  634. }
  635. models := im.getModelsFromJobConditions(jobConditions, sednav1.ILJobTrain, sednav1.ILJobStageCondCompleted, "output")
  636. if models == nil {
  637. return nil
  638. }
  639. var model *Model
  640. switch jobStage {
  641. case sednav1.ILJobTrain:
  642. model = jobConfig.TrainModel
  643. case sednav1.ILJobEval:
  644. model = jobConfig.EvalModel
  645. case sednav1.ILJobDeploy:
  646. model = jobConfig.DeployModel
  647. }
  648. if model == nil {
  649. return nil
  650. }
  651. return getModel(model, models)
  652. }
  653. // triggerTrainTask triggers the train task
  654. func (im *Manager) triggerTrainTask(job *Job) (interface{}, bool, error) {
  655. var err error
  656. jobConfig := job.JobConfig
  657. const numOfSamples = "num_of_samples"
  658. samples := map[string]interface{}{
  659. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  660. }
  661. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  662. if !isTrigger {
  663. return nil, false, nil
  664. }
  665. job.JobConfig.Rounds++
  666. var m *Model
  667. rounds := jobConfig.Rounds
  668. if rounds <= 1 {
  669. m = jobConfig.TrainModel
  670. } else {
  671. m = im.getModelFromJobConditions(job, sednav1.ILJobTrain)
  672. }
  673. var dataIndexURL string
  674. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  675. jobConfig.OutputConfig.SamplesOutput["train"], rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  676. if err != nil {
  677. job.JobConfig.Rounds--
  678. klog.Errorf("job(%s) train phase: write samples to the file(%s) is failed: %v",
  679. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  680. return nil, false, err
  681. }
  682. dataURL := jobConfig.TrainDataURL
  683. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  684. if jobConfig.Storage.IsLocalStorage {
  685. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  686. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  687. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  688. }
  689. input := clienttypes.Input{
  690. Models: []Model{*m},
  691. DataURL: dataURL,
  692. DataIndexURL: dataIndexURL,
  693. OutputDir: outputDir,
  694. }
  695. msg := clienttypes.UpstreamMessage{
  696. Phase: string(sednav1.ILJobTrain),
  697. Status: string(sednav1.ILJobStageCondReady),
  698. Input: &input,
  699. }
  700. jobConfig.TriggerTime = time.Now()
  701. return &msg, true, nil
  702. }
  703. // triggerEvalTask triggers the eval task
  704. func (im *Manager) triggerEvalTask(job *Job) (*clienttypes.UpstreamMessage, error) {
  705. jobConfig := job.JobConfig
  706. var err error
  707. m := im.getModelFromJobConditions(job, sednav1.ILJobEval)
  708. var models []Model
  709. models = append(models, *m, *jobConfig.EvalModel)
  710. var dataIndexURL string
  711. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  712. job.JobConfig.Rounds, jobConfig.Dataset.Spec.Format, jobConfig.Dataset.URLPrefix)
  713. if err != nil {
  714. klog.Errorf("job(%s) eval phase: write samples to the file(%s) is failed: %v",
  715. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  716. return nil, err
  717. }
  718. jobConfig.DataSamples.EvalSamples = []string{}
  719. dataURL := jobConfig.EvalDataURL
  720. if jobConfig.Storage.IsLocalStorage {
  721. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  722. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  723. }
  724. input := clienttypes.Input{
  725. Models: models,
  726. DataURL: dataURL,
  727. DataIndexURL: dataIndexURL,
  728. }
  729. msg := &clienttypes.UpstreamMessage{
  730. Phase: string(sednav1.ILJobEval),
  731. Status: string(sednav1.ILJobStageCondReady),
  732. Input: &input,
  733. }
  734. return msg, nil
  735. }
  736. // triggerDeployTask triggers the deploy task
  737. func (im *Manager) triggerDeployTask(job *Job) (bool, error) {
  738. jobConfig := job.JobConfig
  739. evalResult, err := im.getEvalResult(job)
  740. // EvalResult must has two models info, first is trained model, second is deployed model.
  741. if len(evalResult) != 2 {
  742. return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult))
  743. }
  744. newMetrics := evalResult[0]
  745. oldMetrics := evalResult[1]
  746. metricDelta := make(map[string]interface{})
  747. for metric := range newMetrics {
  748. // keep the full metrics
  749. metricDelta[metric] = newMetrics[metric]
  750. var l []float64
  751. for i := range newMetrics[metric] {
  752. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  753. }
  754. metricDelta[metric+"_delta"] = l
  755. }
  756. tt := job.Spec.DeploySpec.Trigger
  757. // convert tt to map
  758. triggerMap := make(map[string]interface{})
  759. c, err := json.Marshal(tt)
  760. if err != nil {
  761. return false, err
  762. }
  763. err = json.Unmarshal(c, &triggerMap)
  764. if err != nil {
  765. return false, err
  766. }
  767. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  768. }
  769. // updateDeployModelFile updates deploy model file
  770. func (im *Manager) updateDeployModelFile(job *Job, trainedModel string, deployModel string) error {
  771. if job.JobConfig.Storage.IsLocalStorage {
  772. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  773. }
  774. if err := job.JobConfig.Storage.CopyFile(trainedModel, deployModel); err != nil {
  775. return fmt.Errorf("failed to copy trained model(url=%s) to the deploy model(url=%s): %w",
  776. trainedModel, deployModel, err)
  777. }
  778. klog.V(4).Infof("copy trained model(url=%s) to the deploy model(url=%s) successfully", trainedModel, deployModel)
  779. return nil
  780. }
  781. // createOutputDir creates the job output dir
  782. func (job *Job) createOutputDir(jobConfig *JobConfig) error {
  783. outputDir := jobConfig.OutputDir
  784. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  785. if job.JobConfig.Storage.IsLocalStorage {
  786. if err := util.CreateFolder(outputDir); err != nil {
  787. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, outputDir, err)
  788. return err
  789. }
  790. for _, v := range dirNames {
  791. dir := path.Join(outputDir, v)
  792. if err := util.CreateFolder(dir); err != nil {
  793. klog.Errorf("job(%s) failed to create folder %s: %v", jobConfig.UniqueIdentifier, dir, err)
  794. return err
  795. }
  796. }
  797. }
  798. outputConfig := OutputConfig{
  799. SamplesOutput: map[string]string{
  800. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  801. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  802. },
  803. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  804. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  805. }
  806. jobConfig.OutputConfig = &outputConfig
  807. return nil
  808. }
  809. func (im *Manager) getLatestCondition(job *Job) sednav1.ILJobCondition {
  810. jobConditions := job.Status.Conditions
  811. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  812. if len(jobConditions) > 0 {
  813. // get latest pod and pod status
  814. latestCondition = jobConditions[len(jobConditions)-1]
  815. }
  816. return latestCondition
  817. }
  818. func (im *Manager) getModel(namespace string, name string) (sednav1.Model, error) {
  819. modelName := util.GetUniqueIdentifier(namespace, name, model.KindName)
  820. model, ok := im.ModelManager.GetModel(modelName)
  821. if !ok {
  822. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  823. }
  824. return model, nil
  825. }
  826. // loadTrainModel loads initial model information for training.
  827. func (im *Manager) loadTrainModel(job *Job) error {
  828. jobConfig := job.JobConfig
  829. if jobConfig.TrainModel == nil {
  830. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  831. if err != nil {
  832. return err
  833. }
  834. jobConfig.TrainModel = new(Model)
  835. jobConfig.TrainModel.Format = initialModel.Spec.Format
  836. jobConfig.TrainModel.URL = initialModel.Spec.URL
  837. jobConfig.TrainModel.Devices = initialModel.Spec.Devices
  838. }
  839. return nil
  840. }
  841. // loadEvalModel loads initial model information for eval.
  842. func (im *Manager) loadEvalModel(job *Job) error {
  843. jobConfig := job.JobConfig
  844. if jobConfig.EvalModel == nil {
  845. initialModel, err := im.getModel(job.Namespace, job.Spec.EvalSpec.InitialModel.Name)
  846. if err != nil {
  847. return err
  848. }
  849. jobConfig.EvalModel = new(Model)
  850. jobConfig.EvalModel.Format = initialModel.Spec.Format
  851. jobConfig.EvalModel.URL = initialModel.Spec.URL
  852. jobConfig.EvalModel.Devices = initialModel.Spec.Devices
  853. }
  854. return nil
  855. }
  856. // loadDeployModel loads model information for deploying.
  857. func (im *Manager) loadDeployModel(job *Job) error {
  858. jobConfig := job.JobConfig
  859. if jobConfig.DeployModel == nil {
  860. deployModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  861. if err != nil {
  862. return err
  863. }
  864. jobConfig.DeployModel = new(Model)
  865. jobConfig.DeployModel.Format = deployModel.Spec.Format
  866. jobConfig.DeployModel.URL = deployModel.Spec.URL
  867. jobConfig.DeployModel.Devices = deployModel.Spec.Devices
  868. }
  869. return nil
  870. }
  871. // loadDataset loads dataset information
  872. func (im *Manager) loadDataset(job *Job) error {
  873. if job.JobConfig.Dataset != nil {
  874. // already loaded
  875. return nil
  876. }
  877. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, dataset.KindName)
  878. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  879. if !ok || dataset == nil {
  880. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  881. }
  882. job.JobConfig.Dataset = dataset
  883. return nil
  884. }
  885. // handleData updates samples information
  886. func (im *Manager) handleData(job *Job) {
  887. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  888. jobConfig := job.JobConfig
  889. iterCount := 0
  890. for {
  891. select {
  892. case <-jobConfig.Done:
  893. return
  894. default:
  895. }
  896. if iterCount%100 == 0 {
  897. klog.V(4).Infof("job(%s) is handling dataset", jobConfig.UniqueIdentifier)
  898. }
  899. iterCount++
  900. if jobConfig.Dataset == nil || jobConfig.Dataset.DataSource == nil {
  901. // already loaded dataset
  902. <-tick.C
  903. continue
  904. }
  905. dataset := jobConfig.Dataset
  906. currentNumberOfSamples := dataset.DataSource.NumberOfSamples
  907. previousNumberOfSamples := jobConfig.DataSamples.PreviousNumbers
  908. if dataset.DataSource != nil && currentNumberOfSamples > previousNumberOfSamples {
  909. samples := dataset.DataSource.TrainSamples
  910. newNumberOfSamples := currentNumberOfSamples - previousNumberOfSamples
  911. trainNum := int(job.Spec.Dataset.TrainProb * float64(newNumberOfSamples))
  912. jobConfig.Lock.Lock()
  913. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  914. samples[previousNumberOfSamples:previousNumberOfSamples+trainNum]...)
  915. klog.Infof("job(%s)'s current train samples nums is %d", jobConfig.UniqueIdentifier, trainNum)
  916. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  917. samples[previousNumberOfSamples+trainNum:])
  918. jobConfig.Lock.Unlock()
  919. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  920. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  921. }
  922. klog.Infof("job(%s)'s current eval samples nums is %d", jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  923. jobConfig.DataSamples.PreviousNumbers = currentNumberOfSamples
  924. }
  925. <-tick.C
  926. }
  927. }
  928. // createFile creates data file and data index file
  929. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  930. switch format {
  931. case dataset.TXTFormat:
  932. if isLocalStorage {
  933. return path.Join(dir, "data.txt"), ""
  934. }
  935. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  936. }
  937. return "", ""
  938. }
  939. // writeSamples writes samples information to a file
  940. func (im *Manager) writeSamples(job *Job, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  941. if samples == nil {
  942. return "", "", fmt.Errorf("not samples")
  943. }
  944. jobConfig := job.JobConfig
  945. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  946. fileURL, absURLFile := createFile(subDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  947. if jobConfig.Storage.IsLocalStorage {
  948. if err := util.CreateFolder(subDir); err != nil {
  949. return "", "", err
  950. }
  951. if err := im.writeByLine(samples, fileURL); err != nil {
  952. return "", "", err
  953. }
  954. if !jobConfig.Dataset.Storage.IsLocalStorage {
  955. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  956. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  957. return "", "", err
  958. }
  959. }
  960. return fileURL, absURLFile, nil
  961. }
  962. temporaryDir, err := util.CreateTemporaryDir()
  963. if err != nil {
  964. return "", "", err
  965. }
  966. localFileURL, localAbsURLFile := createFile(temporaryDir, format, jobConfig.Dataset.Storage.IsLocalStorage)
  967. if err := im.writeByLine(samples, localFileURL); err != nil {
  968. return "", "", err
  969. }
  970. if err := jobConfig.Storage.Upload(localFileURL, fileURL); err != nil {
  971. return "", "", err
  972. }
  973. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  974. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  975. return "", "", err
  976. }
  977. if err := jobConfig.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  978. return "", "", err
  979. }
  980. defer os.RemoveAll(localFileURL)
  981. defer os.RemoveAll(localAbsURLFile)
  982. return fileURL, absURLFile, nil
  983. }
  984. // writeByLine writes file by line
  985. func (im *Manager) writeByLine(samples []string, fileURL string) error {
  986. file, err := os.Create(fileURL)
  987. if err != nil {
  988. klog.Errorf("create file(%s) failed", fileURL)
  989. return err
  990. }
  991. w := bufio.NewWriter(file)
  992. for _, line := range samples {
  993. _, _ = fmt.Fprintln(w, line)
  994. }
  995. if err := w.Flush(); err != nil {
  996. klog.Errorf("failed to write file(%s): %v", fileURL, err)
  997. return err
  998. }
  999. if err := file.Close(); err != nil {
  1000. klog.Errorf("failed to close file(%s): %v", fileURL, err)
  1001. return err
  1002. }
  1003. return nil
  1004. }
  1005. // monitorWorker monitors message from worker
  1006. func (im *Manager) monitorWorker() {
  1007. for {
  1008. workerMessageChannel := im.WorkerMessageChannel
  1009. workerMessage, ok := <-workerMessageChannel
  1010. if !ok {
  1011. break
  1012. }
  1013. klog.V(4).Infof("handling worker message %+v", workerMessage)
  1014. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  1015. job, ok := im.IncrementalJobMap[name]
  1016. if !ok {
  1017. continue
  1018. }
  1019. // TODO: filter some worker messages out
  1020. wo := clienttypes.Output{}
  1021. wo.Models = workerMessage.Results
  1022. wo.OwnerInfo = workerMessage.OwnerInfo
  1023. msg := &clienttypes.UpstreamMessage{
  1024. Phase: workerMessage.Kind,
  1025. Status: workerMessage.Status,
  1026. Output: &wo,
  1027. }
  1028. if err := im.Client.WriteMessage(msg, job.getHeader()); err != nil {
  1029. klog.Errorf("job(%s) failed to write message: %v", name, err)
  1030. continue
  1031. }
  1032. }
  1033. }
  1034. // forwardSamples deletes the samples information in the memory
  1035. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  1036. switch jobStage {
  1037. case sednav1.ILJobTrain:
  1038. jobConfig.Lock.Lock()
  1039. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  1040. jobConfig.Lock.Unlock()
  1041. case sednav1.ILJobEval:
  1042. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  1043. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  1044. }
  1045. }
  1046. }
  1047. // AddWorkerMessage adds worker messages
  1048. func (im *Manager) AddWorkerMessage(message workertypes.MessageContent) {
  1049. im.WorkerMessageChannel <- message
  1050. }
  1051. // GetName returns name of the manager
  1052. func (im *Manager) GetName() string {
  1053. return KindName
  1054. }
  1055. func (job *Job) getHeader() clienttypes.MessageHeader {
  1056. return clienttypes.MessageHeader{
  1057. Namespace: job.Namespace,
  1058. ResourceKind: job.Kind,
  1059. ResourceName: job.Name,
  1060. Operation: clienttypes.StatusOperation,
  1061. }
  1062. }