You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 30 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package manager
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. "k8s.io/klog/v2"
  25. "github.com/kubeedge/sedna/cmd/sedna-lc/app/options"
  26. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  27. gmtypes "github.com/kubeedge/sedna/pkg/globalmanager/controllers/incrementallearning"
  28. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  29. "github.com/kubeedge/sedna/pkg/localcontroller/db"
  30. "github.com/kubeedge/sedna/pkg/localcontroller/gmclient"
  31. "github.com/kubeedge/sedna/pkg/localcontroller/storage"
  32. "github.com/kubeedge/sedna/pkg/localcontroller/trigger"
  33. "github.com/kubeedge/sedna/pkg/localcontroller/util"
  34. )
  35. // IncrementalLearningJob defines config for incremental-learning-job
  36. type IncrementalLearningJob struct {
  37. sednav1.IncrementalLearningJob
  38. JobConfig *JobConfig
  39. Dataset *Dataset
  40. Done chan struct{}
  41. Storage storage.Storage
  42. }
  43. // JobConfig defines config for incremental-learning-job
  44. type JobConfig struct {
  45. UniqueIdentifier string
  46. Rounds int
  47. TrainTrigger trigger.Base
  48. DeployTrigger trigger.Base
  49. TriggerTime time.Time
  50. TrainTriggerStatus string
  51. EvalTriggerStatus string
  52. TrainDataURL string
  53. EvalDataURL string
  54. OutputDir string
  55. OutputConfig *OutputConfig
  56. DataSamples *DataSamples
  57. TrainModel *ModelInfo
  58. DeployModel *ModelInfo
  59. EvalModels []ModelInfo
  60. EvalResult []ModelInfo
  61. Lock sync.Mutex
  62. }
  63. // OutputConfig defines config for job output
  64. type OutputConfig struct {
  65. SamplesOutput map[string]string `json:"trainData"`
  66. TrainOutput string `json:"trainOutput"`
  67. EvalOutput string `json:"evalOutput"`
  68. }
  69. // DataSamples defines samples information
  70. type DataSamples struct {
  71. Numbers int
  72. TrainSamples []string
  73. EvalVersionSamples [][]string
  74. EvalSamples []string
  75. }
  76. // IncrementalLearningJob defines incremental-learning-job manager
  77. type IncrementalJobManager struct {
  78. Client gmclient.ClientI
  79. WorkerMessageChannel chan WorkerMessage
  80. DatasetManager *DatasetManager
  81. ModelManager *ModelManager
  82. IncrementalJobMap map[string]*IncrementalLearningJob
  83. VolumeMountPrefix string
  84. }
  85. const (
  86. // JobIterationIntervalSeconds is interval time of each iteration of job
  87. JobIterationIntervalSeconds = 10
  88. // DatasetHandlerIntervalSeconds is interval time of handling dataset
  89. DatasetHandlerIntervalSeconds = 10
  90. // EvalSamplesCapacity is capacity of eval samples
  91. EvalSamplesCapacity = 5
  92. //IncrementalLearningJobKind is kind of incremental-learning-job resource
  93. IncrementalLearningJobKind = "incrementallearningjob"
  94. )
  95. // NewIncrementalJobManager creates a incremental-learning-job manager
  96. func NewIncrementalJobManager(client gmclient.ClientI, datasetManager *DatasetManager,
  97. modelManager *ModelManager, options *options.LocalControllerOptions) *IncrementalJobManager {
  98. im := IncrementalJobManager{
  99. Client: client,
  100. WorkerMessageChannel: make(chan WorkerMessage, WorkerMessageChannelCacheSize),
  101. DatasetManager: datasetManager,
  102. ModelManager: modelManager,
  103. IncrementalJobMap: make(map[string]*IncrementalLearningJob),
  104. VolumeMountPrefix: options.VolumeMountPrefix,
  105. }
  106. return &im
  107. }
  108. // Start starts incremental-learning-job manager
  109. func (im *IncrementalJobManager) Start() error {
  110. go im.monitorWorker()
  111. return nil
  112. }
  113. // trainTask starts training task
  114. func (im *IncrementalJobManager) trainTask(job *IncrementalLearningJob, currentRound int) error {
  115. jobConfig := job.JobConfig
  116. latestCond := im.getLatestCondition(job)
  117. jobStage := latestCond.Stage
  118. currentType := latestCond.Type
  119. if currentType == sednav1.ILJobStageCondWaiting {
  120. if job.Dataset == nil {
  121. return fmt.Errorf("job(name=%s) dataset not ready", jobConfig.UniqueIdentifier)
  122. }
  123. err := im.loadTrainModel(job)
  124. if err != nil {
  125. return fmt.Errorf("job(name=%s) failed to sync train model, and waiting it: %v",
  126. jobConfig.UniqueIdentifier, err)
  127. }
  128. if currentRound < jobConfig.Rounds {
  129. currentRound = jobConfig.Rounds
  130. initTriggerStatus(jobConfig)
  131. }
  132. }
  133. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.TrainTriggerStatus == TriggerReadyStatus {
  134. payload, ok, err := im.triggerTrainTask(job)
  135. if !ok {
  136. return nil
  137. }
  138. if err != nil {
  139. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  140. jobConfig.UniqueIdentifier, jobStage, err)
  141. return err
  142. }
  143. err = im.Client.WriteMessage(payload, job.getHeader())
  144. if err != nil {
  145. klog.Errorf("job(name=%s) failed to write message: %v",
  146. jobConfig.UniqueIdentifier, err)
  147. return err
  148. }
  149. jobConfig.TrainTriggerStatus = TriggerCompletedStatus
  150. jobConfig.Rounds++
  151. forwardSamples(jobConfig, jobStage)
  152. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  153. jobConfig.UniqueIdentifier, jobStage)
  154. }
  155. return nil
  156. }
  157. // evalTask starts eval task
  158. func (im *IncrementalJobManager) evalTask(job *IncrementalLearningJob) error {
  159. jobConfig := job.JobConfig
  160. latestCond := im.getLatestCondition(job)
  161. jobStage := latestCond.Stage
  162. currentType := latestCond.Type
  163. if currentType == sednav1.ILJobStageCondWaiting {
  164. err := im.loadDeployModel(job)
  165. if err != nil {
  166. klog.Warningf("job(name=%s) failed to sync deploy model, and waiting it: %v",
  167. jobConfig.UniqueIdentifier, err)
  168. }
  169. }
  170. if currentType == sednav1.ILJobStageCondWaiting && jobConfig.EvalTriggerStatus == TriggerReadyStatus {
  171. payload, err := im.triggerEvalTask(job)
  172. if err != nil {
  173. klog.Errorf("job(name=%s) complete the %sing phase triggering task failed, error: %v",
  174. jobConfig.UniqueIdentifier, jobStage, err)
  175. return err
  176. }
  177. err = im.Client.WriteMessage(payload, job.getHeader())
  178. if err != nil {
  179. return err
  180. }
  181. jobConfig.EvalTriggerStatus = TriggerCompletedStatus
  182. forwardSamples(jobConfig, jobStage)
  183. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  184. jobConfig.UniqueIdentifier, jobStage)
  185. }
  186. return nil
  187. }
  188. // deployTask starts deploy task
  189. func (im *IncrementalJobManager) deployTask(job *IncrementalLearningJob) {
  190. jobConfig := job.JobConfig
  191. var err error
  192. var neededDeploy bool
  193. neededDeploy, err = im.triggerDeployTask(job)
  194. status := UpstreamMessage{Phase: string(sednav1.ILJobDeploy)}
  195. if err == nil && neededDeploy {
  196. deployModel, err := im.deployModel(job)
  197. if err != nil {
  198. klog.Errorf("failed to deploy model for job(name=%s): %v", jobConfig.UniqueIdentifier, err)
  199. } else {
  200. klog.Infof("deployed model for job(name=%s) successfully", jobConfig.UniqueIdentifier)
  201. }
  202. if err != nil || deployModel == nil {
  203. status.Status = string(sednav1.ILJobStageCondFailed)
  204. } else {
  205. status.Status = string(sednav1.ILJobStageCondReady)
  206. status.Input = &WorkerInput{
  207. Models: []ModelInfo{
  208. *deployModel,
  209. },
  210. }
  211. }
  212. klog.Infof("job(name=%s) complete the %sing phase triggering task successfully",
  213. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  214. } else {
  215. // No need to deploy, just report completed status
  216. // TODO: instead of reporting deploy-completed, another more reasonable status
  217. klog.Infof("no need to deploy model for job(name=%s)", jobConfig.UniqueIdentifier)
  218. status.Status = string(sednav1.ILJobStageCondCompleted)
  219. }
  220. err = im.Client.WriteMessage(status, job.getHeader())
  221. if err != nil {
  222. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  223. jobConfig.UniqueIdentifier, sednav1.ILJobDeploy, err)
  224. }
  225. klog.Infof("job(name=%s) complete the %s task successfully", jobConfig.UniqueIdentifier, sednav1.ILJobDeploy)
  226. }
  227. // startJob starts a job
  228. func (im *IncrementalJobManager) startJob(name string) {
  229. var err error
  230. job := im.IncrementalJobMap[name]
  231. job.JobConfig = new(JobConfig)
  232. jobConfig := job.JobConfig
  233. jobConfig.UniqueIdentifier = name
  234. err = im.initJob(job)
  235. if err != nil {
  236. klog.Errorf("failed to init job (name=%s): %+v", jobConfig.UniqueIdentifier)
  237. return
  238. }
  239. klog.Infof("incremental job(name=%s) is started", name)
  240. defer klog.Infof("incremental learning job(name=%s) is stopped", name)
  241. cond := im.getLatestCondition(job)
  242. currentType := cond.Type
  243. jobStage := cond.Stage
  244. if jobStage == sednav1.ILJobTrain && currentType == sednav1.ILJobStageCondWaiting {
  245. go im.handleData(job)
  246. }
  247. currentRound := jobConfig.Rounds
  248. tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
  249. for {
  250. select {
  251. case <-job.Done:
  252. return
  253. default:
  254. }
  255. latestCond := im.getLatestCondition(job)
  256. jobStage := latestCond.Stage
  257. switch jobStage {
  258. case sednav1.ILJobTrain:
  259. err = im.trainTask(job, currentRound)
  260. case sednav1.ILJobEval:
  261. err = im.evalTask(job)
  262. default:
  263. klog.Errorf("invalid phase: %s", jobStage)
  264. continue
  265. }
  266. if err != nil {
  267. klog.Errorf("job(name=%s) complete the %s task failed, error: %v",
  268. jobConfig.UniqueIdentifier, jobStage, err)
  269. }
  270. <-tick.C
  271. }
  272. }
  273. // Insert inserts incremental-learning-job config to db
  274. func (im *IncrementalJobManager) Insert(message *gmclient.Message) error {
  275. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  276. first := false
  277. job, ok := im.IncrementalJobMap[name]
  278. if !ok {
  279. job = &IncrementalLearningJob{}
  280. job.Storage = storage.Storage{IsLocalStorage: false}
  281. job.Done = make(chan struct{})
  282. im.IncrementalJobMap[name] = job
  283. first = true
  284. }
  285. if err := json.Unmarshal(message.Content, &job); err != nil {
  286. return err
  287. }
  288. credential := job.ObjectMeta.Annotations[CredentialAnnotationKey]
  289. if credential != "" {
  290. if err := job.Storage.SetCredential(credential); err != nil {
  291. return fmt.Errorf("failed to set job(name=%s)'s storage credential, error: %+v", name, err)
  292. }
  293. }
  294. if first {
  295. go im.startJob(name)
  296. }
  297. if err := db.SaveResource(name, job.TypeMeta, job.ObjectMeta, job.Spec); err != nil {
  298. return err
  299. }
  300. return nil
  301. }
  302. // Delete deletes incremental-learning-job config in db
  303. func (im *IncrementalJobManager) Delete(message *gmclient.Message) error {
  304. name := util.GetUniqueIdentifier(message.Header.Namespace, message.Header.ResourceName, message.Header.ResourceKind)
  305. if job, ok := im.IncrementalJobMap[name]; ok && job.Done != nil {
  306. close(job.Done)
  307. }
  308. delete(im.IncrementalJobMap, name)
  309. if err := db.DeleteResource(name); err != nil {
  310. return err
  311. }
  312. return nil
  313. }
  314. // initJob inits the job object
  315. func (im *IncrementalJobManager) initJob(job *IncrementalLearningJob) error {
  316. jobConfig := job.JobConfig
  317. jobConfig.Lock = sync.Mutex{}
  318. jobConfig.Rounds = 1
  319. initTriggerStatus(jobConfig)
  320. trainTrigger, err := newTrigger(job.Spec.TrainSpec.Trigger)
  321. if err != nil {
  322. return fmt.Errorf("failed to init train trigger: %+w", err)
  323. }
  324. deployTrigger, err := newTrigger(job.Spec.DeploySpec.Trigger)
  325. if err != nil {
  326. return fmt.Errorf("failed to init deploy trigger: %+w", err)
  327. }
  328. jobConfig.TrainTrigger = trainTrigger
  329. jobConfig.DeployTrigger = deployTrigger
  330. outputDir := job.Spec.OutputDir
  331. isLocalURL, err := job.Storage.IsLocalURL(outputDir)
  332. if err != nil {
  333. return fmt.Errorf("job(name=%s)'s output dir is invalid, error: %+v", job.Name, outputDir)
  334. }
  335. if isLocalURL {
  336. job.Storage.IsLocalStorage = true
  337. outputDir = util.AddPrefixPath(im.VolumeMountPrefix, outputDir)
  338. }
  339. jobConfig.OutputDir = outputDir
  340. if err := job.createOutputDir(jobConfig); err != nil {
  341. return err
  342. }
  343. return nil
  344. }
  345. func initTriggerStatus(jobConfig *JobConfig) {
  346. jobConfig.TrainTriggerStatus = TriggerReadyStatus
  347. jobConfig.EvalTriggerStatus = TriggerReadyStatus
  348. }
  349. func newTrigger(t sednav1.Trigger) (trigger.Base, error) {
  350. // convert trigger to map
  351. triggerMap := make(map[string]interface{})
  352. c, err := json.Marshal(t)
  353. if err != nil {
  354. return nil, err
  355. }
  356. err = json.Unmarshal(c, &triggerMap)
  357. if err != nil {
  358. return nil, err
  359. }
  360. return trigger.NewTrigger(triggerMap)
  361. }
  362. // getTrainOrEvalModel gets train model or eval model from job conditions
  363. func (im *IncrementalJobManager) getTrainOrEvalModel(job *IncrementalLearningJob, jobStage sednav1.ILJobStage) *ModelInfo {
  364. jobConditions := job.Status.Conditions
  365. // TODO: runtime.type changes to common.type for gm and lc
  366. var models []runtime.Model
  367. for i := len(jobConditions) - 1; i >= 0; i-- {
  368. var cond gmtypes.IncrementalCondData
  369. jobCond := jobConditions[i]
  370. if jobCond.Stage == sednav1.ILJobTrain && jobCond.Type == sednav1.ILJobStageCondCompleted {
  371. if err := (&cond).Unmarshal([]byte(jobCond.Data)); err != nil {
  372. continue
  373. }
  374. if cond.Output == nil {
  375. continue
  376. }
  377. // models list has two model, first is deploy model, second is trained model
  378. models = cond.Output.Models
  379. break
  380. }
  381. }
  382. // models must have two model file info which are output of train,
  383. // first model will be used for inference if it evaluated as excellent, second model will be used for retaining.
  384. if len(models) != 2 {
  385. return nil
  386. }
  387. switch jobStage {
  388. case sednav1.ILJobTrain:
  389. return &ModelInfo{Format: models[1].Format, URL: models[1].URL}
  390. case sednav1.ILJobEval:
  391. return &ModelInfo{Format: models[0].Format, URL: models[0].URL}
  392. }
  393. return nil
  394. }
  395. // triggerTrainTask triggers the train task
  396. func (im *IncrementalJobManager) triggerTrainTask(job *IncrementalLearningJob) (interface{}, bool, error) {
  397. var err error
  398. jobConfig := job.JobConfig
  399. const numOfSamples = "num_of_samples"
  400. samples := map[string]interface{}{
  401. numOfSamples: len(jobConfig.DataSamples.TrainSamples),
  402. }
  403. isTrigger := jobConfig.TrainTrigger.Trigger(samples)
  404. if !isTrigger {
  405. return nil, false, nil
  406. }
  407. var m *ModelInfo
  408. latestCondition := im.getLatestCondition(job)
  409. rounds := jobConfig.Rounds
  410. if rounds <= 1 {
  411. m = jobConfig.TrainModel
  412. } else {
  413. m = im.getTrainOrEvalModel(job, latestCondition.Stage)
  414. if m == nil {
  415. return nil, false, err
  416. }
  417. }
  418. var dataIndexURL string
  419. jobConfig.TrainDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.TrainSamples,
  420. jobConfig.OutputConfig.SamplesOutput["train"], rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  421. if err != nil {
  422. klog.Errorf("job(name=%s) train phase: write samples to the file(%s) is failed, error: %v",
  423. jobConfig.UniqueIdentifier, jobConfig.TrainDataURL, err)
  424. return nil, false, err
  425. }
  426. dataURL := jobConfig.TrainDataURL
  427. outputDir := strings.Join([]string{jobConfig.OutputConfig.TrainOutput, strconv.Itoa(rounds)}, "/")
  428. if job.Storage.IsLocalStorage {
  429. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  430. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  431. outputDir = util.TrimPrefixPath(im.VolumeMountPrefix, outputDir)
  432. }
  433. input := WorkerInput{
  434. Models: []ModelInfo{*m},
  435. DataURL: dataURL,
  436. DataIndexURL: dataIndexURL,
  437. OutputDir: outputDir,
  438. }
  439. msg := UpstreamMessage{
  440. Phase: string(sednav1.ILJobTrain),
  441. Status: string(sednav1.ILJobStageCondReady),
  442. Input: &input,
  443. }
  444. jobConfig.TriggerTime = time.Now()
  445. return &msg, true, nil
  446. }
  447. // triggerEvalTask triggers the eval task
  448. func (im *IncrementalJobManager) triggerEvalTask(job *IncrementalLearningJob) (*UpstreamMessage, error) {
  449. jobConfig := job.JobConfig
  450. var err error
  451. latestCondition := im.getLatestCondition(job)
  452. m := im.getTrainOrEvalModel(job, latestCondition.Stage)
  453. if m == nil {
  454. return nil, err
  455. }
  456. models := []ModelInfo{*m, {
  457. Format: jobConfig.DeployModel.Format,
  458. URL: jobConfig.DeployModel.URL,
  459. }}
  460. // EvalModels has two models, first is trained model, second is deployed model
  461. jobConfig.EvalModels = models
  462. var dataIndexURL string
  463. rounds := jobConfig.Rounds
  464. jobConfig.EvalDataURL, dataIndexURL, err = im.writeSamples(job, jobConfig.DataSamples.EvalSamples, jobConfig.OutputConfig.SamplesOutput["eval"],
  465. rounds, job.Dataset.Spec.Format, job.Dataset.URLPrefix)
  466. if err != nil {
  467. klog.Errorf("job(name=%s) eval phase: write samples to the file(%s) is failed, error: %v",
  468. jobConfig.UniqueIdentifier, jobConfig.EvalDataURL, err)
  469. return nil, err
  470. }
  471. dataURL := jobConfig.EvalDataURL
  472. if job.Storage.IsLocalStorage {
  473. dataURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataURL)
  474. dataIndexURL = util.TrimPrefixPath(im.VolumeMountPrefix, dataIndexURL)
  475. }
  476. input := WorkerInput{
  477. Models: models,
  478. DataURL: dataURL,
  479. DataIndexURL: dataIndexURL,
  480. }
  481. msg := &UpstreamMessage{
  482. Phase: string(sednav1.ILJobEval),
  483. Status: string(sednav1.ILJobStageCondReady),
  484. Input: &input,
  485. }
  486. return msg, nil
  487. }
  488. // triggerDeployTask triggers the deploy task
  489. func (im *IncrementalJobManager) triggerDeployTask(job *IncrementalLearningJob) (bool, error) {
  490. jobConfig := job.JobConfig
  491. // EvalResult must has two models info, first is trained model, second is deployed model.
  492. if len(jobConfig.EvalResult) != 2 {
  493. return false, fmt.Errorf("expected 2 evaluation results, actual: %d", len(jobConfig.EvalResult))
  494. }
  495. newMetrics, oldMetrics := jobConfig.EvalResult[0].Metrics, jobConfig.EvalResult[1].Metrics
  496. metricDelta := make(map[string]interface{})
  497. for metric := range newMetrics {
  498. // keep the full metrics
  499. metricDelta[metric] = newMetrics[metric]
  500. var l []float64
  501. for i := range newMetrics[metric] {
  502. l = append(l, newMetrics[metric][i]-oldMetrics[metric][i])
  503. }
  504. metricDelta[metric+"_delta"] = l
  505. }
  506. tt := job.Spec.DeploySpec.Trigger
  507. // convert tt to map
  508. triggerMap := make(map[string]interface{})
  509. c, err := json.Marshal(tt)
  510. if err != nil {
  511. return false, err
  512. }
  513. err = json.Unmarshal(c, &triggerMap)
  514. if err != nil {
  515. return false, err
  516. }
  517. return jobConfig.DeployTrigger.Trigger(metricDelta), nil
  518. }
  519. // deployModel deploys model
  520. func (im *IncrementalJobManager) deployModel(job *IncrementalLearningJob) (*ModelInfo, error) {
  521. jobConfig := job.JobConfig
  522. trainedModel := jobConfig.EvalModels[0].URL
  523. deployModel := jobConfig.EvalModels[1].URL
  524. if job.Storage.IsLocalStorage {
  525. trainedModel = util.AddPrefixPath(im.VolumeMountPrefix, trainedModel)
  526. }
  527. if err := job.updateDeployModel(deployModel, trainedModel); err != nil {
  528. return nil, err
  529. }
  530. klog.Infof("job(name=%s) deploys model(url=%s) successfully", jobConfig.UniqueIdentifier, trainedModel)
  531. return &jobConfig.EvalModels[0], nil
  532. }
  533. func (job *IncrementalLearningJob) updateDeployModel(deployModel string, newModel string) error {
  534. if err := job.Storage.CopyFile(newModel, deployModel); err != nil {
  535. return fmt.Errorf("copy model(url=%s) to the deploy model(url=%s) failed, error: %+v",
  536. newModel, deployModel, err)
  537. }
  538. klog.Infof("copy model(url=%s) to the deploy model(url=%s) successfully", newModel, deployModel)
  539. return nil
  540. }
  541. // createOutputDir creates the job output dir
  542. func (job *IncrementalLearningJob) createOutputDir(jobConfig *JobConfig) error {
  543. outputDir := jobConfig.OutputDir
  544. dirNames := []string{"data/train", "data/eval", "train", "eval"}
  545. if job.Storage.IsLocalStorage {
  546. if err := util.CreateFolder(outputDir); err != nil {
  547. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, outputDir)
  548. return err
  549. }
  550. for _, v := range dirNames {
  551. dir := path.Join(outputDir, v)
  552. if err := util.CreateFolder(dir); err != nil {
  553. klog.Errorf("job(name=%s) create fold %s failed", jobConfig.UniqueIdentifier, dir)
  554. return err
  555. }
  556. }
  557. }
  558. outputConfig := OutputConfig{
  559. SamplesOutput: map[string]string{
  560. "train": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[0]}, "/"),
  561. "eval": strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[1]}, "/"),
  562. },
  563. TrainOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[2]}, "/"),
  564. EvalOutput: strings.Join([]string{strings.TrimRight(outputDir, "/"), dirNames[3]}, "/"),
  565. }
  566. jobConfig.OutputConfig = &outputConfig
  567. return nil
  568. }
  569. func (im *IncrementalJobManager) getLatestCondition(job *IncrementalLearningJob) sednav1.ILJobCondition {
  570. jobConditions := job.Status.Conditions
  571. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{}
  572. if len(jobConditions) > 0 {
  573. // get latest pod and pod status
  574. latestCondition = jobConditions[len(jobConditions)-1]
  575. }
  576. return latestCondition
  577. }
  578. func (im *IncrementalJobManager) getModel(namespace string, name string) (sednav1.Model, error) {
  579. modelName := util.GetUniqueIdentifier(namespace, name, ModelResourceKind)
  580. model, ok := im.ModelManager.GetModel(modelName)
  581. if !ok {
  582. return model, fmt.Errorf("not exists model(name=%s)", modelName)
  583. }
  584. return model, nil
  585. }
  586. // loadTrainModel loads initial model information for training.
  587. func (im *IncrementalJobManager) loadTrainModel(job *IncrementalLearningJob) error {
  588. jobConfig := job.JobConfig
  589. if jobConfig.TrainModel == nil {
  590. initialModel, err := im.getModel(job.Namespace, job.Spec.InitialModel.Name)
  591. if err != nil {
  592. return err
  593. }
  594. jobConfig.TrainModel = new(ModelInfo)
  595. format := initialModel.Spec.Format
  596. url := initialModel.Spec.URL
  597. jobConfig.TrainModel.Format = format
  598. jobConfig.TrainModel.URL = url
  599. }
  600. return nil
  601. }
  602. // loadDeployModel loads model information for deploying.
  603. func (im *IncrementalJobManager) loadDeployModel(job *IncrementalLearningJob) error {
  604. jobConfig := job.JobConfig
  605. if jobConfig.DeployModel == nil {
  606. evalModel, err := im.getModel(job.Namespace, job.Spec.DeploySpec.Model.Name)
  607. if err != nil {
  608. return err
  609. }
  610. jobConfig.DeployModel = new(ModelInfo)
  611. jobConfig.DeployModel.Format = evalModel.Spec.Format
  612. jobConfig.DeployModel.URL = evalModel.Spec.URL
  613. }
  614. return nil
  615. }
  616. // loadDataset loads dataset information
  617. func (im *IncrementalJobManager) loadDataset(job *IncrementalLearningJob) error {
  618. if job.Dataset != nil {
  619. // already loaded
  620. return nil
  621. }
  622. datasetName := util.GetUniqueIdentifier(job.Namespace, job.Spec.Dataset.Name, DatasetResourceKind)
  623. dataset, ok := im.DatasetManager.GetDataset(datasetName)
  624. if !ok || dataset == nil {
  625. return fmt.Errorf("not exists dataset(name=%s)", datasetName)
  626. }
  627. jobConfig := job.JobConfig
  628. jobConfig.DataSamples = &DataSamples{
  629. Numbers: 0,
  630. TrainSamples: make([]string, 0),
  631. EvalVersionSamples: make([][]string, 0),
  632. EvalSamples: make([]string, 0),
  633. }
  634. job.Dataset = dataset
  635. return nil
  636. }
  637. // handleData updates samples information
  638. func (im *IncrementalJobManager) handleData(job *IncrementalLearningJob) {
  639. tick := time.NewTicker(DatasetHandlerIntervalSeconds * time.Second)
  640. jobConfig := job.JobConfig
  641. iterCount := 0
  642. for {
  643. select {
  644. case <-job.Done:
  645. return
  646. default:
  647. }
  648. // in case dataset is not synced to LC before job synced to LC
  649. // here call loadDataset in each period
  650. err := im.loadDataset(job)
  651. if iterCount%100 == 0 {
  652. klog.Infof("job(name=%s) handling dataset", jobConfig.UniqueIdentifier)
  653. }
  654. iterCount++
  655. if err != nil {
  656. klog.Warningf("job(name=%s) failed to load dataset, and waiting it: %v",
  657. jobConfig.UniqueIdentifier,
  658. err)
  659. <-tick.C
  660. continue
  661. }
  662. dataset := job.Dataset
  663. if dataset.DataSource != nil && len(dataset.DataSource.TrainSamples) > jobConfig.DataSamples.Numbers {
  664. samples := dataset.DataSource.TrainSamples
  665. trainNum := int(job.Spec.Dataset.TrainProb * float64(len(samples)-jobConfig.DataSamples.Numbers))
  666. jobConfig.Lock.Lock()
  667. jobConfig.DataSamples.TrainSamples = append(jobConfig.DataSamples.TrainSamples,
  668. samples[(jobConfig.DataSamples.Numbers+1):(jobConfig.DataSamples.Numbers+trainNum+1)]...)
  669. klog.Infof("job(name=%s) current train samples nums is %d",
  670. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.TrainSamples))
  671. jobConfig.DataSamples.EvalVersionSamples = append(jobConfig.DataSamples.EvalVersionSamples,
  672. samples[(jobConfig.DataSamples.Numbers+trainNum+1):])
  673. jobConfig.Lock.Unlock()
  674. for _, v := range jobConfig.DataSamples.EvalVersionSamples {
  675. jobConfig.DataSamples.EvalSamples = append(jobConfig.DataSamples.EvalSamples, v...)
  676. }
  677. klog.Infof("job(name=%s) current eval samples nums is %d",
  678. jobConfig.UniqueIdentifier, len(jobConfig.DataSamples.EvalSamples))
  679. jobConfig.DataSamples.Numbers = len(samples)
  680. }
  681. <-tick.C
  682. }
  683. }
  684. // createFile creates data file and data index file
  685. func createFile(dir string, format string, isLocalStorage bool) (string, string) {
  686. switch format {
  687. case "txt":
  688. if isLocalStorage {
  689. return path.Join(dir, "data.txt"), ""
  690. }
  691. return strings.Join([]string{dir, "data.txt"}, "/"), strings.Join([]string{dir, "dataIndex.txt"}, "/")
  692. }
  693. return "", ""
  694. }
  695. // writeSamples writes samples information to a file
  696. func (im *IncrementalJobManager) writeSamples(job *IncrementalLearningJob, samples []string, dir string, rounds int, format string, urlPrefix string) (string, string, error) {
  697. subDir := strings.Join([]string{dir, strconv.Itoa(rounds)}, "/")
  698. fileURL, absURLFile := createFile(subDir, format, job.Dataset.Storage.IsLocalStorage)
  699. if job.Storage.IsLocalStorage {
  700. if err := util.CreateFolder(subDir); err != nil {
  701. return "", "", err
  702. }
  703. if err := im.writeByLine(samples, fileURL); err != nil {
  704. return "", "", err
  705. }
  706. if !job.Dataset.Storage.IsLocalStorage {
  707. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  708. if err := im.writeByLine(tempSamples, absURLFile); err != nil {
  709. return "", "", err
  710. }
  711. }
  712. return fileURL, absURLFile, nil
  713. }
  714. temporaryDir, err := util.CreateTemporaryDir()
  715. if err != nil {
  716. return "", "", err
  717. }
  718. localFileURL, localAbsURLFile := createFile(temporaryDir, format, job.Dataset.Storage.IsLocalStorage)
  719. if err := im.writeByLine(samples, localFileURL); err != nil {
  720. return "", "", err
  721. }
  722. if err := job.Storage.Upload(localFileURL, fileURL); err != nil {
  723. return "", "", err
  724. }
  725. tempSamples := util.ParsingDatasetIndex(samples, urlPrefix)
  726. if err := im.writeByLine(tempSamples, localAbsURLFile); err != nil {
  727. return "", "", err
  728. }
  729. if err := job.Storage.Upload(localAbsURLFile, absURLFile); err != nil {
  730. return "", "", err
  731. }
  732. defer os.RemoveAll(localFileURL)
  733. defer os.RemoveAll(localAbsURLFile)
  734. return fileURL, absURLFile, nil
  735. }
  736. // writeByLine writes file by line
  737. func (im *IncrementalJobManager) writeByLine(samples []string, fileURL string) error {
  738. file, err := os.Create(fileURL)
  739. if err != nil {
  740. klog.Errorf("create file(%s) failed", fileURL)
  741. return err
  742. }
  743. w := bufio.NewWriter(file)
  744. for _, line := range samples {
  745. _, _ = fmt.Fprintln(w, line)
  746. }
  747. if err := w.Flush(); err != nil {
  748. klog.Errorf("write file(%s) failed", fileURL)
  749. return err
  750. }
  751. if err := file.Close(); err != nil {
  752. klog.Errorf("close file failed, error: %v", err)
  753. return err
  754. }
  755. return nil
  756. }
  757. // monitorWorker monitors message from worker
  758. func (im *IncrementalJobManager) monitorWorker() {
  759. for {
  760. workerMessageChannel := im.WorkerMessageChannel
  761. workerMessage, ok := <-workerMessageChannel
  762. if !ok {
  763. break
  764. }
  765. klog.V(4).Infof("handling worker message %+v", workerMessage)
  766. name := util.GetUniqueIdentifier(workerMessage.Namespace, workerMessage.OwnerName, workerMessage.OwnerKind)
  767. job, ok := im.IncrementalJobMap[name]
  768. if !ok {
  769. continue
  770. }
  771. // TODO: filter some worker messages out
  772. wo := WorkerOutput{}
  773. wo.Models = workerMessage.Results
  774. wo.OwnerInfo = workerMessage.OwnerInfo
  775. msg := &UpstreamMessage{
  776. Phase: workerMessage.Kind,
  777. Status: workerMessage.Status,
  778. Output: &wo,
  779. }
  780. im.Client.WriteMessage(msg, job.getHeader())
  781. im.handleWorkerMessage(job, workerMessage)
  782. }
  783. }
  784. // handleWorkerMessage handles message from worker
  785. func (im *IncrementalJobManager) handleWorkerMessage(job *IncrementalLearningJob, workerMessage WorkerMessage) {
  786. latestCond := im.getLatestCondition(job)
  787. jobStage := strings.ToLower(string(latestCond.Stage))
  788. workerKind := strings.ToLower(workerMessage.Kind)
  789. if jobStage != workerKind {
  790. klog.Warningf("job(name=%s) %s phase get worker(kind=%s)", job.JobConfig.UniqueIdentifier,
  791. jobStage, workerKind)
  792. return
  793. }
  794. var models []ModelInfo
  795. for _, result := range workerMessage.Results {
  796. metrics := map[string][]float64{}
  797. if m, ok := result["metrics"]; ok {
  798. bytes, err := json.Marshal(m)
  799. if err != nil {
  800. return
  801. }
  802. err = json.Unmarshal(bytes, &metrics)
  803. if err != nil {
  804. klog.Warningf("failed to unmarshal the worker(name=%s) metrics %v, err: %v",
  805. workerMessage.Name,
  806. m,
  807. err)
  808. }
  809. }
  810. model := ModelInfo{
  811. result["format"].(string),
  812. result["url"].(string),
  813. metrics}
  814. models = append(models, model)
  815. }
  816. workerStatus := workerMessage.Status
  817. jobName := job.JobConfig.UniqueIdentifier
  818. if workerStatus == WorkerCompletedStatus {
  819. klog.Infof("job(name=%s) complete the %s task successfully", jobName, jobStage)
  820. switch latestCond.Stage {
  821. case sednav1.ILJobEval:
  822. job.JobConfig.EvalResult = models
  823. // when eval worker is complete, the deploy task starts immediately without waiting for the notification of GM.
  824. im.deployTask(job)
  825. }
  826. }
  827. }
  828. // forwardSamples deletes the samples information in the memory
  829. func forwardSamples(jobConfig *JobConfig, jobStage sednav1.ILJobStage) {
  830. switch jobStage {
  831. case sednav1.ILJobTrain:
  832. jobConfig.Lock.Lock()
  833. jobConfig.DataSamples.TrainSamples = jobConfig.DataSamples.TrainSamples[:0]
  834. jobConfig.Lock.Unlock()
  835. case sednav1.ILJobEval:
  836. if len(jobConfig.DataSamples.EvalVersionSamples) > EvalSamplesCapacity {
  837. jobConfig.DataSamples.EvalVersionSamples = jobConfig.DataSamples.EvalVersionSamples[1:]
  838. }
  839. }
  840. }
  841. // AddWorkerMessage adds worker messages
  842. func (im *IncrementalJobManager) AddWorkerMessage(message WorkerMessage) {
  843. im.WorkerMessageChannel <- message
  844. }
  845. // GetName returns name of the manager
  846. func (im *IncrementalJobManager) GetName() string {
  847. return IncrementalLearningJobKind
  848. }
  849. func (job *IncrementalLearningJob) getHeader() gmclient.MessageHeader {
  850. return gmclient.MessageHeader{
  851. Namespace: job.Namespace,
  852. ResourceKind: job.Kind,
  853. ResourceName: job.Name,
  854. Operation: gmclient.StatusOperation,
  855. }
  856. }