You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 26 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package globalmanager
  14. import (
  15. "context"
  16. "fmt"
  17. "path/filepath"
  18. "strings"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. utilrand "k8s.io/apimachinery/pkg/util/rand"
  24. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  25. "k8s.io/apimachinery/pkg/util/wait"
  26. kubeinformers "k8s.io/client-go/informers"
  27. "k8s.io/client-go/kubernetes"
  28. "k8s.io/client-go/kubernetes/scheme"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. "k8s.io/client-go/tools/cache"
  32. "k8s.io/client-go/tools/record"
  33. "k8s.io/client-go/util/workqueue"
  34. "k8s.io/klog/v2"
  35. k8scontroller "k8s.io/kubernetes/pkg/controller"
  36. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  37. clientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned"
  38. sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  39. informers "github.com/kubeedge/sedna/pkg/client/informers/externalversions"
  40. sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
  41. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  42. messageContext "github.com/kubeedge/sedna/pkg/globalmanager/messagelayer/ws"
  43. "github.com/kubeedge/sedna/pkg/globalmanager/utils"
  44. )
  45. // ijControllerKind contains the schema.GroupVersionKind for this controller type.
  46. var ijControllerKind = sednav1.SchemeGroupVersion.WithKind("IncrementalLearningJob")
  47. // IncrementalJobController ensures that all IncrementalLearningJob objects have corresponding pods to
  48. // run their configured workload.
  49. type IncrementalJobController struct {
  50. kubeClient kubernetes.Interface
  51. client sednaclientset.SednaV1alpha1Interface
  52. podControl k8scontroller.PodControlInterface
  53. // podStoreSynced returns true if the pod store has been synced at least once.
  54. // Added as a member to the struct to allow injection for testing.
  55. podStoreSynced cache.InformerSynced
  56. // jobStoreSynced returns true if the incrementaljob store has been synced at least once.
  57. // Added as a member to the struct to allow injection for testing.
  58. jobStoreSynced cache.InformerSynced
  59. // A store of jobs
  60. jobLister sednav1listers.IncrementalLearningJobLister
  61. // A store of pods, populated by the podController
  62. podStore corelisters.PodLister
  63. // IncrementalLearningJobs that need to be updated
  64. queue workqueue.RateLimitingInterface
  65. recorder record.EventRecorder
  66. cfg *config.ControllerConfig
  67. }
  68. // Run the main goroutine responsible for watching and syncing jobs.
  69. func (jc *IncrementalJobController) Start() error {
  70. workers := 1
  71. stopCh := messageContext.Done()
  72. go func() {
  73. defer utilruntime.HandleCrash()
  74. defer jc.queue.ShutDown()
  75. klog.Infof("Starting incrementallearning job controller")
  76. defer klog.Infof("Shutting down incrementallearning job controller")
  77. if !cache.WaitForNamedCacheSync("incrementallearningjob", stopCh, jc.podStoreSynced, jc.jobStoreSynced) {
  78. klog.Errorf("failed to wait for caches to sync")
  79. return
  80. }
  81. klog.Infof("Starting incrementallearning job workers")
  82. for i := 0; i < workers; i++ {
  83. go wait.Until(jc.worker, time.Second, stopCh)
  84. }
  85. <-stopCh
  86. }()
  87. return nil
  88. }
  89. // enqueueByPod enqueues the jointInferenceService object of the specified pod.
  90. func (jc *IncrementalJobController) enqueueByPod(pod *v1.Pod, immediate bool) {
  91. controllerRef := metav1.GetControllerOf(pod)
  92. if controllerRef == nil {
  93. return
  94. }
  95. if controllerRef.Kind != jointServiceControllerKind.Kind {
  96. return
  97. }
  98. service, err := jc.jobLister.IncrementalLearningJobs(pod.Namespace).Get(controllerRef.Name)
  99. if err != nil {
  100. return
  101. }
  102. if service.UID != controllerRef.UID {
  103. return
  104. }
  105. jc.enqueueController(service, immediate)
  106. }
  107. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  108. func (jc *IncrementalJobController) addPod(obj interface{}) {
  109. pod := obj.(*v1.Pod)
  110. if pod.DeletionTimestamp != nil {
  111. // on a restart of the controller, it's possible a new pod shows up in a state that
  112. // is already pending deletion. Prevent the pod from being a creation observation.
  113. jc.deletePod(pod)
  114. return
  115. }
  116. // backoff to queue when PodFailed
  117. immediate := pod.Status.Phase != v1.PodFailed
  118. jc.enqueueByPod(pod, immediate)
  119. }
  120. // When a pod is updated, figure out what joint inference service manage it and wake them up.
  121. func (jc *IncrementalJobController) updatePod(old, cur interface{}) {
  122. curPod := cur.(*v1.Pod)
  123. oldPod := old.(*v1.Pod)
  124. // no pod update, no queue
  125. if curPod.ResourceVersion == oldPod.ResourceVersion {
  126. return
  127. }
  128. jc.addPod(curPod)
  129. }
  130. // deletePod enqueues the jointinferenceservice obj When a pod is deleted
  131. func (jc *IncrementalJobController) deletePod(obj interface{}) {
  132. pod, ok := obj.(*v1.Pod)
  133. // comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/job_controller.go
  134. // When a delete is dropped, the relist will notice a pod in the store not
  135. // in the list, leading to the insertion of a tombstone object which contains
  136. // the deleted key/value. Note that this value might be stale. If the pod
  137. // changed labels the new jointinferenceservice will not be woken up till the periodic resync.
  138. if !ok {
  139. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  140. if !ok {
  141. klog.Warningf("couldn't get object from tombstone %+v", obj)
  142. return
  143. }
  144. pod, ok = tombstone.Obj.(*v1.Pod)
  145. if !ok {
  146. klog.Warningf("tombstone contained object that is not a pod %+v", obj)
  147. return
  148. }
  149. }
  150. jc.enqueueByPod(pod, true)
  151. }
  152. // obj could be an *sedna.IncrementalLearningJob, or a DeletionFinalStateUnknown marker item,
  153. // immediate tells the controller to update the status right away, and should
  154. // happen ONLY when there was a successful pod run.
  155. func (jc *IncrementalJobController) enqueueController(obj interface{}, immediate bool) {
  156. key, err := k8scontroller.KeyFunc(obj)
  157. if err != nil {
  158. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  159. return
  160. }
  161. backoff := time.Duration(0)
  162. if !immediate {
  163. backoff = getBackoff(jc.queue, key)
  164. }
  165. jc.queue.AddAfter(key, backoff)
  166. }
  167. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  168. // It enforces that the syncHandler is never invoked concurrently with the same key.
  169. func (jc *IncrementalJobController) worker() {
  170. for jc.processNextWorkItem() {
  171. }
  172. }
  173. func (jc *IncrementalJobController) processNextWorkItem() bool {
  174. key, quit := jc.queue.Get()
  175. if quit {
  176. return false
  177. }
  178. defer jc.queue.Done(key)
  179. forget, err := jc.sync(key.(string))
  180. if err == nil {
  181. if forget {
  182. jc.queue.Forget(key)
  183. }
  184. return true
  185. }
  186. utilruntime.HandleError(fmt.Errorf("Error syncing incrementallearning job: %v", err))
  187. jc.queue.AddRateLimited(key)
  188. return true
  189. }
  190. // sync will sync the incrementallearning job with the given key if it has had its expectations fulfilled, meaning
  191. // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
  192. // concurrently with the same key.
  193. func (jc *IncrementalJobController) sync(key string) (bool, error) {
  194. startTime := time.Now()
  195. defer func() {
  196. klog.V(4).Infof("Finished syncing incrementallearning job %q (%v)", key, time.Since(startTime))
  197. }()
  198. ns, name, err := cache.SplitMetaNamespaceKey(key)
  199. if err != nil {
  200. return false, err
  201. }
  202. if len(ns) == 0 || len(name) == 0 {
  203. return false, fmt.Errorf("invalid incrementallearning job key %q: either namespace or name is missing", key)
  204. }
  205. sharedIncrementalJob, err := jc.jobLister.IncrementalLearningJobs(ns).Get(name)
  206. if err != nil {
  207. if errors.IsNotFound(err) {
  208. klog.V(4).Infof("incrementallearning job has been deleted: %v", key)
  209. return true, nil
  210. }
  211. return false, err
  212. }
  213. incrementaljob := *sharedIncrementalJob
  214. // set kind for incrementaljob in case that the kind is None
  215. incrementaljob.SetGroupVersionKind(sednav1.SchemeGroupVersion.WithKind("IncrementalLearningJob"))
  216. // incrementaljob first start, create pod for inference
  217. if incrementaljob.Status.StartTime == nil {
  218. now := metav1.Now()
  219. incrementaljob.Status.StartTime = &now
  220. pod := jc.getSpecifiedPods(&incrementaljob, "inference")
  221. if pod == nil {
  222. err = jc.createInferPod(&incrementaljob)
  223. } else {
  224. if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodPending {
  225. err = jc.createInferPod(&incrementaljob)
  226. }
  227. }
  228. if err != nil {
  229. return false, nil
  230. }
  231. }
  232. // if incrementaljob was finished previously, we don't want to redo the termination
  233. if IsIncrementalJobFinished(&incrementaljob) {
  234. return true, nil
  235. }
  236. forget := false
  237. jobFailed := false
  238. needUpdated := false
  239. // update conditions of incremental job
  240. needUpdated, err = jc.updateIncrementalJobConditions(&incrementaljob)
  241. if err != nil {
  242. klog.V(2).Infof("incrementallearning job %v/%v faied to be updated, err:%s", incrementaljob.Namespace, incrementaljob.Name, err)
  243. }
  244. if needUpdated {
  245. if err := jc.updateIncrementalJobStatus(&incrementaljob); err != nil {
  246. return forget, err
  247. }
  248. if jobFailed && !IsIncrementalJobFinished(&incrementaljob) {
  249. // returning an error will re-enqueue IncrementalJob after the backoff period
  250. return forget, fmt.Errorf("failed pod(s) detected for incrementaljob key %q", key)
  251. }
  252. forget = true
  253. }
  254. return forget, err
  255. }
  256. // updateIncrementalJobConditions ensures that conditions of incrementallearning job can be changed by podstatus
  257. func (jc *IncrementalJobController) updateIncrementalJobConditions(incrementaljob *sednav1.IncrementalLearningJob) (bool, error) {
  258. var initialType sednav1.ILJobStageConditionType
  259. var latestCondition sednav1.ILJobCondition = sednav1.ILJobCondition{
  260. Stage: sednav1.ILJobTrain,
  261. Type: initialType,
  262. }
  263. var newConditionType sednav1.ILJobStageConditionType
  264. latestCondition.Stage = sednav1.ILJobTrain
  265. var needUpdated = false
  266. jobConditions := incrementaljob.Status.Conditions
  267. var podStatus v1.PodPhase = v1.PodUnknown
  268. if len(jobConditions) > 0 {
  269. // get latest pod and pod status
  270. latestCondition = (jobConditions)[len(jobConditions)-1]
  271. klog.V(2).Infof("incrementallearning job %v/%v latest stage %v:", incrementaljob.Namespace, incrementaljob.Name,
  272. latestCondition.Stage)
  273. pod := jc.getSpecifiedPods(incrementaljob, string(latestCondition.Stage))
  274. if pod != nil {
  275. podStatus = pod.Status.Phase
  276. }
  277. }
  278. jobStage := latestCondition.Stage
  279. currentType := latestCondition.Type
  280. newConditionType = currentType
  281. switch currentType {
  282. case initialType:
  283. newConditionType = sednav1.ILJobStageCondWaiting
  284. case sednav1.ILJobStageCondWaiting:
  285. // do nothing, waiting for LC to set type from waiting to ready
  286. case sednav1.ILJobStageCondReady:
  287. // create a pod, and set type from ready to starting
  288. // include train, eval, deploy pod
  289. var err error
  290. if jobStage == sednav1.ILJobDeploy {
  291. err = jc.restartInferPod(incrementaljob)
  292. if err != nil {
  293. klog.V(2).Infof("incrementallearning job %v/%v inference pod failed to restart, err:%s", incrementaljob.Namespace, incrementaljob.Name, err)
  294. } else {
  295. klog.V(2).Infof("incrementallearning job %v/%v inference pod restarts successfully", incrementaljob.Namespace, incrementaljob.Name)
  296. }
  297. } else if podStatus != v1.PodPending && podStatus != v1.PodRunning {
  298. err = jc.createPod(incrementaljob, jobStage)
  299. }
  300. if err != nil {
  301. return needUpdated, err
  302. }
  303. newConditionType = sednav1.ILJobStageCondStarting
  304. case sednav1.ILJobStageCondStarting, sednav1.ILJobStageCondRunning:
  305. if podStatus == v1.PodRunning {
  306. if jobStage == sednav1.ILJobDeploy {
  307. newConditionType = sednav1.ILJobStageCondCompleted
  308. } else {
  309. // watch pod status, if pod running, set type running
  310. newConditionType = sednav1.ILJobStageCondRunning
  311. }
  312. } else if podStatus == v1.PodSucceeded {
  313. // watch pod status, if pod completed, set type completed
  314. newConditionType = sednav1.ILJobStageCondCompleted
  315. klog.V(2).Infof("incrementallearning job %v/%v %v stage completed!", incrementaljob.Namespace, incrementaljob.Name, jobStage)
  316. } else if podStatus == v1.PodFailed {
  317. newConditionType = sednav1.ILJobStageCondFailed
  318. klog.V(2).Infof("incrementallearning job %v/%v %v stage failed!", incrementaljob.Namespace, incrementaljob.Name, jobStage)
  319. }
  320. case sednav1.ILJobStageCondCompleted:
  321. jobStage = getNextStage(jobStage)
  322. newConditionType = sednav1.ILJobStageCondWaiting
  323. case sednav1.ILJobStageCondFailed:
  324. jobStage = sednav1.ILJobTrain
  325. newConditionType = sednav1.ILJobStageCondWaiting
  326. default:
  327. // do nothing when given other type out of cases
  328. }
  329. klog.V(2).Infof("incrementallearning job %v/%v, conditions: %v", incrementaljob.Namespace, incrementaljob.Name, jobConditions)
  330. if latestCondition.Type != newConditionType {
  331. incrementaljob.Status.Conditions = append(incrementaljob.Status.Conditions, NewIncrementalJobCondition(newConditionType, jobStage))
  332. needUpdated = true
  333. return needUpdated, nil
  334. }
  335. return needUpdated, nil
  336. }
  337. // updateIncrementalJobStatus ensures that jobstatus can be updated rightly
  338. func (jc *IncrementalJobController) updateIncrementalJobStatus(incrementaljob *sednav1.IncrementalLearningJob) error {
  339. jobClient := jc.client.IncrementalLearningJobs(incrementaljob.Namespace)
  340. var err error
  341. for i := 0; i <= statusUpdateRetries; i = i + 1 {
  342. var newIncrementalJob *sednav1.IncrementalLearningJob
  343. newIncrementalJob, err = jobClient.Get(context.TODO(), incrementaljob.Name, metav1.GetOptions{})
  344. if err != nil {
  345. break
  346. }
  347. newIncrementalJob.Status = incrementaljob.Status
  348. if _, err = jobClient.UpdateStatus(context.TODO(), newIncrementalJob, metav1.UpdateOptions{}); err == nil {
  349. break
  350. }
  351. }
  352. return err
  353. }
  354. func NewIncrementalJobCondition(conditionType sednav1.ILJobStageConditionType, jobStage sednav1.ILJobStage) sednav1.ILJobCondition {
  355. return sednav1.ILJobCondition{
  356. Type: conditionType,
  357. Status: v1.ConditionTrue,
  358. LastHeartbeatTime: metav1.Now(),
  359. LastTransitionTime: metav1.Now(),
  360. Reason: "",
  361. Message: "",
  362. Stage: jobStage,
  363. }
  364. }
  365. func (jc *IncrementalJobController) generatePodName(jobName string, workerType string) string {
  366. return jobName + "-" + strings.ToLower(workerType) + "-" + utilrand.String(5)
  367. }
  368. func (jc *IncrementalJobController) getSpecifiedPods(job *sednav1.IncrementalLearningJob, podType string) *v1.Pod {
  369. if podType == "Deploy" {
  370. podType = "inference"
  371. }
  372. var latestPod *v1.Pod
  373. selector, _ := GenerateSelector(job)
  374. pods, err := jc.podStore.Pods(job.Namespace).List(selector)
  375. if len(pods) == 0 || err != nil {
  376. return nil
  377. }
  378. var matchTag = false
  379. latestPod = pods[0]
  380. for _, pod := range pods {
  381. s := strings.Split(pod.Name, "-")
  382. CurrentPodType := s[len(s)-2]
  383. if (latestPod.CreationTimestamp.Before(&pod.CreationTimestamp) || latestPod.CreationTimestamp.Equal(&pod.CreationTimestamp)) && CurrentPodType == strings.ToLower(podType) {
  384. latestPod = pod
  385. matchTag = true
  386. }
  387. }
  388. if !matchTag {
  389. return nil
  390. }
  391. return latestPod
  392. }
  393. func (jc *IncrementalJobController) restartInferPod(job *sednav1.IncrementalLearningJob) error {
  394. inferPod := jc.getSpecifiedPods(job, "inference")
  395. if inferPod == nil {
  396. klog.V(2).Infof("No inferpod is running in incrementallearning job %v/%v", job.Namespace, job.Name)
  397. err := jc.createInferPod(job)
  398. return err
  399. }
  400. ctx := context.Background()
  401. err := jc.kubeClient.CoreV1().Pods(job.Namespace).Delete(ctx, inferPod.Name, metav1.DeleteOptions{})
  402. if err != nil {
  403. klog.Warningf("failed to delete inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
  404. return err
  405. }
  406. err = jc.createInferPod(job)
  407. if err != nil {
  408. klog.Warningf("failed to create inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
  409. return err
  410. }
  411. return nil
  412. }
  413. func getNextStage(currentStage sednav1.ILJobStage) sednav1.ILJobStage {
  414. switch currentStage {
  415. case sednav1.ILJobTrain:
  416. return sednav1.ILJobEval
  417. case sednav1.ILJobEval:
  418. return sednav1.ILJobDeploy
  419. case sednav1.ILJobDeploy:
  420. return sednav1.ILJobTrain
  421. default:
  422. return sednav1.ILJobTrain
  423. }
  424. }
  425. func IsIncrementalJobFinished(j *sednav1.IncrementalLearningJob) bool {
  426. // TODO
  427. return false
  428. }
  429. func (jc *IncrementalJobController) createPod(job *sednav1.IncrementalLearningJob, podtype sednav1.ILJobStage) (err error) {
  430. ctx := context.Background()
  431. var podTemplate *v1.PodTemplateSpec
  432. incrementalDatasetName := job.Spec.Dataset.Name
  433. initialModelName := job.Spec.InitialModel.Name
  434. deployModelName := job.Spec.DeploySpec.Model.Name
  435. // get basemodel URL, deploymodel, dataset URL
  436. var basemodelPath string
  437. var deploymodelPath string
  438. var datasetPath string
  439. basemodel, err := jc.client.Models(job.Namespace).Get(ctx, initialModelName, metav1.GetOptions{})
  440. if err != nil {
  441. return fmt.Errorf("failed to get initial model %s: %w",
  442. initialModelName, err)
  443. }
  444. basemodelPath = filepath.Dir(basemodel.Spec.URL)
  445. deploymodel, err := jc.client.Models(job.Namespace).Get(ctx, deployModelName, metav1.GetOptions{})
  446. if err != nil {
  447. return fmt.Errorf("failed to get deploy model %s: %w",
  448. deployModelName, err)
  449. }
  450. deploymodelPath = filepath.Dir(deploymodel.Spec.URL)
  451. dataset, err := jc.client.Datasets(job.Namespace).Get(ctx, incrementalDatasetName, metav1.GetOptions{})
  452. if err != nil {
  453. return fmt.Errorf("failed to get dataset %s: %w",
  454. incrementalDatasetName, err)
  455. }
  456. datasetPath = dataset.Spec.URL
  457. outputDir := job.Spec.OutputDir
  458. datasetParent := filepath.Dir(datasetPath)
  459. // get all url for train and eval from data in condition
  460. condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data
  461. klog.V(2).Infof("incrementallearning job %v/%v data condition:%s", job.Namespace, job.Name, condDataStr)
  462. var cond IncrementalCondData
  463. (&cond).Unmarshal([]byte(condDataStr))
  464. if cond.Input == nil {
  465. return fmt.Errorf("empty input from condData")
  466. }
  467. dataURL := cond.Input.DataURL
  468. inputmodelURLs := cond.GetInputModelURLs()
  469. outputmodelURL := cond.Input.OutputDir
  470. // convert user inputs into its form in the container
  471. var inputmodelURLList []string
  472. var inputmodelURLContain string
  473. var outputmodelURLContain string
  474. var dataURLContain string
  475. // process inputmodelURLs, add dataPrefix to ench inputmodelURL, return inputmodelURLList
  476. for _, URL := range inputmodelURLs {
  477. inputmodelURLList = append(inputmodelURLList, dataPrefix+URL)
  478. }
  479. // three container Url for data, inputmodel, outputmodel
  480. inputmodelURLContain = strings.Join(inputmodelURLList, ";")
  481. outputmodelURLContain = dataPrefix + outputmodelURL
  482. dataURLContain = dataPrefix + dataURL
  483. // Container VolumeMounts parameters
  484. dataConPath := dataPrefix + datasetParent
  485. basemodelConPath := dataPrefix + basemodelPath
  486. deploymodelConPath := dataPrefix + deploymodelPath
  487. outputConPath := dataPrefix + outputDir
  488. originalDatasetPathInContainer := dataPrefix + datasetPath
  489. var workerPara *WorkerPara = new(WorkerPara)
  490. if podtype == sednav1.ILJobTrain {
  491. workerPara.workerType = "Train"
  492. podTemplate = &job.Spec.TrainSpec.Template
  493. // Env parameters for train
  494. preModelURL := inputmodelURLContain // premodel savepath before increase
  495. outputModelURL := outputmodelURLContain // outputmodel savepath after increase, should be under outputdir
  496. trainDataURL := dataURLContain
  497. // Configure container mounting and Env information for train by initial WorkerPara
  498. workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
  499. workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir}
  500. workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"}
  501. workerPara.env = map[string]string{
  502. // see https://github.com/kubeedge/sedna/issues/35
  503. "ORIGINAL_DATASET_URL": originalDatasetPathInContainer,
  504. "TRAIN_DATASET_URL": trainDataURL,
  505. "MODEL_URL": outputModelURL,
  506. "BASE_MODEL_URL": preModelURL,
  507. "NAMESPACE": job.Namespace,
  508. "JOB_NAME": job.Name,
  509. "WORKER_NAME": "train-worker-" + utilrand.String(5),
  510. "LC_SERVER": jc.cfg.LC.Server,
  511. }
  512. } else {
  513. podTemplate = &job.Spec.EvalSpec.Template
  514. workerPara.workerType = "Eval"
  515. // Env parameters for eval
  516. evalDataURL := dataURLContain
  517. modelForEval := inputmodelURLContain // can be single or multi models
  518. // Configure container mounting and Env information for eval by initial WorkerPara
  519. workerPara.volumeMountList = []string{dataConPath, basemodelConPath, deploymodelConPath, outputConPath}
  520. workerPara.volumeList = []string{datasetParent, basemodelPath, deploymodelPath, outputDir}
  521. workerPara.volumeMapName = []string{"data", "base-model", "deploy-model", "output-dir"}
  522. workerPara.env = map[string]string{
  523. "ORIGINAL_DATASET_URL": originalDatasetPathInContainer,
  524. "TEST_DATASET_URL": evalDataURL,
  525. "MODEL_URLS": modelForEval,
  526. "NAMESPACE": job.Namespace,
  527. "JOB_NAME": job.Name,
  528. "WORKER_NAME": "eval-worker-" + utilrand.String(5),
  529. "LC_SERVER": jc.cfg.LC.Server,
  530. }
  531. }
  532. // create pod based on podtype
  533. _, err = createPodWithTemplate(jc.kubeClient, job, podTemplate, workerPara)
  534. if err != nil {
  535. return err
  536. }
  537. return
  538. }
  539. func (jc *IncrementalJobController) createInferPod(job *sednav1.IncrementalLearningJob) error {
  540. infermodelName := job.Spec.DeploySpec.Model.Name
  541. inferModel, err := jc.client.Models(job.Namespace).Get(context.TODO(), infermodelName, metav1.GetOptions{})
  542. if err != nil {
  543. return fmt.Errorf("failed to get infer model %s: %w",
  544. infermodelName, err)
  545. }
  546. inferModelPath := inferModel.Spec.URL
  547. // convert crd to JSON, and put them into env of container
  548. inferModelParent := filepath.Dir(inferModelPath)
  549. // Container VolumeMounts parameters
  550. inferModelConPath := dataPrefix + inferModelParent
  551. // Env parameters for edge
  552. inferModelURL := dataPrefix + inferModelPath
  553. // Configure container mounting and Env information by initial WorkerPara
  554. var inferContainer *WorkerPara = new(WorkerPara)
  555. inferContainer.volumeMountList = []string{inferModelConPath}
  556. inferContainer.volumeList = []string{inferModelParent}
  557. inferContainer.volumeMapName = []string{"model"}
  558. inferContainer.env = map[string]string{
  559. "WORKER_NAME": "inferworker-" + utilrand.String(5),
  560. "MODEL_URL": inferModelURL,
  561. "NAMESPACE": job.Namespace,
  562. "HARD_SAMPLE_ALGORITHM": job.Spec.DeploySpec.HardExampleMining.Name,
  563. "LC_SERVER": jc.cfg.LC.Server,
  564. }
  565. inferContainer.workerType = "inference"
  566. inferContainer.hostNetwork = true
  567. // create edge pod
  568. _, err = createPodWithTemplate(jc.kubeClient, job, &job.Spec.DeploySpec.Template, inferContainer)
  569. return err
  570. }
  571. // GetName returns the name of the incrementallearning job controller
  572. func (jc *IncrementalJobController) GetName() string {
  573. return "IncrementalLearningJobController"
  574. }
  575. // NewIncrementalJobController creates a new IncrementalJob controller that keeps the relevant pods
  576. // in sync with their corresponding IncrementalJob objects.
  577. func NewIncrementalJobController(cfg *config.ControllerConfig) (FeatureControllerI, error) {
  578. namespace := cfg.Namespace
  579. if namespace == "" {
  580. namespace = metav1.NamespaceAll
  581. }
  582. kubeClient, err := utils.KubeClient()
  583. if err != nil {
  584. return nil, err
  585. }
  586. kubecfg, err := utils.KubeConfig()
  587. if err != nil {
  588. return nil, err
  589. }
  590. crdclient, err := clientset.NewForConfig(kubecfg)
  591. if err != nil {
  592. return nil, err
  593. }
  594. kubeInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, time.Second*30, kubeinformers.WithNamespace(namespace))
  595. podInformer := kubeInformerFactory.Core().V1().Pods()
  596. jobInformerFactory := informers.NewSharedInformerFactoryWithOptions(crdclient, time.Second*30, informers.WithNamespace(namespace))
  597. jobInformer := jobInformerFactory.Sedna().V1alpha1().IncrementalLearningJobs()
  598. eventBroadcaster := record.NewBroadcaster()
  599. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
  600. jc := &IncrementalJobController{
  601. kubeClient: kubeClient,
  602. client: crdclient.SednaV1alpha1(),
  603. podControl: k8scontroller.RealPodControl{
  604. KubeClient: kubeClient,
  605. Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "incrementallearningjob-controller"}),
  606. },
  607. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultBackOff, MaxBackOff), "incrementallearningjob"),
  608. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "incrementallearningjob-controller"}),
  609. cfg: cfg,
  610. }
  611. jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  612. AddFunc: func(obj interface{}) {
  613. jc.enqueueController(obj, true)
  614. },
  615. UpdateFunc: func(old, cur interface{}) {
  616. jc.enqueueController(cur, true)
  617. },
  618. DeleteFunc: func(obj interface{}) {
  619. jc.enqueueController(obj, true)
  620. },
  621. })
  622. jc.jobLister = jobInformer.Lister()
  623. jc.jobStoreSynced = jobInformer.Informer().HasSynced
  624. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  625. AddFunc: jc.addPod,
  626. UpdateFunc: jc.updatePod,
  627. DeleteFunc: jc.deletePod,
  628. })
  629. jc.podStore = podInformer.Lister()
  630. jc.podStoreSynced = podInformer.Informer().HasSynced
  631. stopCh := make(chan struct{})
  632. kubeInformerFactory.Start(stopCh)
  633. jobInformerFactory.Start(stopCh)
  634. return jc, err
  635. }