You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

incrementallearningjob.go 27 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package incrementallearning
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "strings"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. "k8s.io/apimachinery/pkg/api/errors"
  22. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  23. "k8s.io/apimachinery/pkg/types"
  24. utilrand "k8s.io/apimachinery/pkg/util/rand"
  25. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  26. "k8s.io/apimachinery/pkg/util/wait"
  27. "k8s.io/apimachinery/pkg/watch"
  28. "k8s.io/client-go/kubernetes"
  29. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  30. corelisters "k8s.io/client-go/listers/core/v1"
  31. "k8s.io/client-go/tools/cache"
  32. "k8s.io/client-go/tools/record"
  33. "k8s.io/client-go/util/workqueue"
  34. "k8s.io/klog/v2"
  35. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  36. sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  37. sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
  38. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  39. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  40. )
  41. const (
  42. // Name is this controller name
  43. Name = "IncrementalLearning"
  44. // KindName is the kind name of CR this controller controls
  45. KindName = "IncrementalLearningJob"
  46. )
  47. // Kind contains the schema.GroupVersionKind for this controller type.
  48. var Kind = sednav1.SchemeGroupVersion.WithKind(KindName)
  49. // Controller ensures that all IncrementalLearningJob objects have corresponding pods to
  50. // run their configured workload.
  51. type Controller struct {
  52. kubeClient kubernetes.Interface
  53. client sednaclientset.SednaV1alpha1Interface
  54. // podStoreSynced returns true if the pod store has been synced at least once.
  55. // Added as a member to the struct to allow injection for testing.
  56. podStoreSynced cache.InformerSynced
  57. // jobStoreSynced returns true if the job store has been synced at least once.
  58. // Added as a member to the struct to allow injection for testing.
  59. jobStoreSynced cache.InformerSynced
  60. // A store of jobs
  61. jobLister sednav1listers.IncrementalLearningJobLister
  62. // A store of pods, populated by the podController
  63. podStore corelisters.PodLister
  64. // IncrementalLearningJobs that need to be updated
  65. queue workqueue.RateLimitingInterface
  66. cfg *config.ControllerConfig
  67. sendToEdgeFunc runtime.DownstreamSendFunc
  68. }
  69. // Run starts the main goroutine responsible for watching and syncing jobs.
  70. func (c *Controller) Run(stopCh <-chan struct{}) {
  71. // TODO: make workers parameter
  72. workers := 1
  73. defer utilruntime.HandleCrash()
  74. defer c.queue.ShutDown()
  75. klog.Infof("Starting %s controller", Name)
  76. defer klog.Infof("Shutting down %s controller", Name)
  77. if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.jobStoreSynced) {
  78. klog.Errorf("failed to wait for %s caches to sync", Name)
  79. return
  80. }
  81. klog.Infof("Starting %s job workers", Name)
  82. for i := 0; i < workers; i++ {
  83. go wait.Until(c.worker, time.Second, stopCh)
  84. }
  85. <-stopCh
  86. }
  87. // enqueueByPod enqueues the jointInferenceService object of the specified pod.
  88. func (c *Controller) enqueueByPod(pod *v1.Pod, immediate bool) {
  89. controllerRef := metav1.GetControllerOf(pod)
  90. if controllerRef == nil {
  91. return
  92. }
  93. if controllerRef.Kind != Kind.Kind {
  94. return
  95. }
  96. service, err := c.jobLister.IncrementalLearningJobs(pod.Namespace).Get(controllerRef.Name)
  97. if err != nil {
  98. return
  99. }
  100. if service.UID != controllerRef.UID {
  101. return
  102. }
  103. c.enqueueController(service, immediate)
  104. }
  105. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  106. func (c *Controller) addPod(obj interface{}) {
  107. pod := obj.(*v1.Pod)
  108. if pod.DeletionTimestamp != nil {
  109. // on a restart of the controller, it's possible a new pod shows up in a state that
  110. // is already pending deletion. Prevent the pod from being a creation observation.
  111. c.deletePod(pod)
  112. return
  113. }
  114. // backoff to queue when PodFailed
  115. immediate := pod.Status.Phase != v1.PodFailed
  116. c.enqueueByPod(pod, immediate)
  117. }
  118. // When a pod is updated, figure out what joint inference service manage it and wake them up.
  119. func (c *Controller) updatePod(old, cur interface{}) {
  120. curPod := cur.(*v1.Pod)
  121. oldPod := old.(*v1.Pod)
  122. // no pod update, no queue
  123. if curPod.ResourceVersion == oldPod.ResourceVersion {
  124. return
  125. }
  126. c.addPod(curPod)
  127. }
  128. // deletePod enqueues the jointinferenceservice obj When a pod is deleted
  129. func (c *Controller) deletePod(obj interface{}) {
  130. pod, ok := obj.(*v1.Pod)
  131. // comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/job_controller.go
  132. // When a delete is dropped, the relist will notice a pod in the store not
  133. // in the list, leading to the insertion of a tombstone object which contains
  134. // the deleted key/value. Note that this value might be stale. If the pod
  135. // changed labels the new jointinferenceservice will not be woken up till the periodic resync.
  136. if !ok {
  137. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  138. if !ok {
  139. klog.Warningf("couldn't get object from tombstone %+v", obj)
  140. return
  141. }
  142. pod, ok = tombstone.Obj.(*v1.Pod)
  143. if !ok {
  144. klog.Warningf("tombstone contained object that is not a pod %+v", obj)
  145. return
  146. }
  147. }
  148. c.enqueueByPod(pod, true)
  149. }
  150. // obj could be an *sedna.IncrementalLearningJob, or a DeletionFinalStateUnknown marker item,
  151. // immediate tells the controller to update the status right away, and should
  152. // happen ONLY when there was a successful pod run.
  153. func (c *Controller) enqueueController(obj interface{}, immediate bool) {
  154. key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
  155. if err != nil {
  156. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
  157. return
  158. }
  159. backoff := time.Duration(0)
  160. if !immediate {
  161. backoff = runtime.GetBackoff(c.queue, key)
  162. }
  163. c.queue.AddAfter(key, backoff)
  164. }
  165. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  166. // It enforces that the syncHandler is never invoked concurrently with the same key.
  167. func (c *Controller) worker() {
  168. for c.processNextWorkItem() {
  169. }
  170. }
  171. func (c *Controller) processNextWorkItem() bool {
  172. key, quit := c.queue.Get()
  173. if quit {
  174. return false
  175. }
  176. defer c.queue.Done(key)
  177. forget, err := c.sync(key.(string))
  178. if err == nil {
  179. if forget {
  180. c.queue.Forget(key)
  181. }
  182. return true
  183. }
  184. utilruntime.HandleError(fmt.Errorf("Error syncing incrementallearning job: %v", err))
  185. c.queue.AddRateLimited(key)
  186. return true
  187. }
  188. // sync will sync the incrementallearning job with the given key if it has had its expectations fulfilled, meaning
  189. // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
  190. // concurrently with the same key.
  191. func (c *Controller) sync(key string) (bool, error) {
  192. startTime := time.Now()
  193. defer func() {
  194. klog.V(4).Infof("Finished syncing incrementallearning job %q (%v)", key, time.Since(startTime))
  195. }()
  196. ns, name, err := cache.SplitMetaNamespaceKey(key)
  197. if err != nil {
  198. return false, err
  199. }
  200. if len(ns) == 0 || len(name) == 0 {
  201. return false, fmt.Errorf("invalid incrementallearning job key %q: either namespace or name is missing", key)
  202. }
  203. sharedJob, err := c.jobLister.IncrementalLearningJobs(ns).Get(name)
  204. if err != nil {
  205. if errors.IsNotFound(err) {
  206. klog.V(4).Infof("incrementallearning job has been deleted: %v", key)
  207. return true, nil
  208. }
  209. return false, err
  210. }
  211. job := *sharedJob
  212. // set kind in case that the kind is None
  213. job.SetGroupVersionKind(Kind)
  214. // when job is handled at first, create pod for inference
  215. if job.Status.StartTime == nil {
  216. now := metav1.Now()
  217. job.Status.StartTime = &now
  218. pod := c.getSpecifiedPods(&job, runtime.InferencePodType)
  219. if pod == nil {
  220. err = c.createInferPod(&job)
  221. } else {
  222. if pod.Status.Phase != v1.PodRunning && pod.Status.Phase != v1.PodPending {
  223. err = c.createInferPod(&job)
  224. }
  225. }
  226. if err != nil {
  227. return false, nil
  228. }
  229. }
  230. // if job was finished previously, we don't want to redo the termination
  231. if IsJobFinished(&job) {
  232. return true, nil
  233. }
  234. forget := false
  235. jobFailed := false
  236. needUpdated := false
  237. // transit this job's state machine
  238. needUpdated, err = c.transitJobState(&job)
  239. if err != nil {
  240. klog.V(2).Infof("incrementallearning job %v/%v failed to be updated, err:%s", job.Namespace, job.Name, err)
  241. }
  242. if needUpdated {
  243. if err := c.updateJobStatus(&job); err != nil {
  244. return forget, err
  245. }
  246. if jobFailed && !IsJobFinished(&job) {
  247. // returning an error will re-enqueue IncrementalLearningJob after the backoff period
  248. return forget, fmt.Errorf("failed pod(s) detected for incrementallearning job key %q", key)
  249. }
  250. forget = true
  251. }
  252. return forget, err
  253. }
  254. // setWorkerNodeNameOfJob sets the worker nodeName of the specified job
  255. // which is used for downstream to sync job info to the specified LC located in nodeName.
  256. func (c *Controller) setWorkerNodeNameOfJob(job *sednav1.IncrementalLearningJob, jobStage string, nodeName string) error {
  257. key := runtime.AnnotationsKeyPrefix + jobStage
  258. return c.addJobAnnotations(job, key, nodeName)
  259. }
  260. // addJobAnnotations adds info in job annotations
  261. func (c *Controller) addJobAnnotations(job *sednav1.IncrementalLearningJob, key string, value string) error {
  262. ann := job.GetAnnotations()
  263. if ann[key] == value {
  264. // already set
  265. return nil
  266. }
  267. patchData := metav1.PartialObjectMetadata{
  268. ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{key: value}}}
  269. patchDataBytes, err := json.Marshal(&patchData)
  270. if err != nil {
  271. return err
  272. }
  273. jobClient := c.client.IncrementalLearningJobs(job.Namespace)
  274. return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error {
  275. newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{})
  276. if err != nil {
  277. return err
  278. }
  279. annotations := newJob.GetAnnotations()
  280. if annotations[key] == value {
  281. return nil
  282. }
  283. _, err = jobClient.Patch(context.TODO(), job.Name, types.MergePatchType, patchDataBytes, metav1.PatchOptions{})
  284. return err
  285. })
  286. }
  287. // transitJobState transit job to next state
  288. func (c *Controller) transitJobState(job *sednav1.IncrementalLearningJob) (bool, error) {
  289. var initialType sednav1.ILJobStageConditionType
  290. var latestCondition = sednav1.ILJobCondition{
  291. Stage: sednav1.ILJobTrain,
  292. Type: initialType,
  293. }
  294. var newConditionType sednav1.ILJobStageConditionType
  295. var needUpdated = false
  296. var podStatus = v1.PodUnknown
  297. var pod *v1.Pod
  298. jobConditions := job.Status.Conditions
  299. if len(jobConditions) > 0 {
  300. // get latest pod and pod status
  301. latestCondition = (jobConditions)[len(jobConditions)-1]
  302. klog.V(2).Infof("incrementallearning job %v/%v latest stage %v:", job.Namespace, job.Name,
  303. latestCondition.Stage)
  304. pod = c.getSpecifiedPods(job, string(latestCondition.Stage))
  305. if pod != nil {
  306. podStatus = pod.Status.Phase
  307. }
  308. }
  309. jobStage := latestCondition.Stage
  310. currentType := latestCondition.Type
  311. newConditionType = currentType
  312. modelHotUpdate := job.Spec.DeploySpec.Model.HotUpdateEnabled
  313. switch currentType {
  314. case initialType:
  315. newConditionType = sednav1.ILJobStageCondWaiting
  316. case sednav1.ILJobStageCondWaiting:
  317. // do nothing, waiting for LC to set type from waiting to ready
  318. case sednav1.ILJobStageCondReady:
  319. // create a pod, and set type from ready to starting
  320. // include train, eval, deploy pod
  321. var err error
  322. if jobStage == sednav1.ILJobDeploy {
  323. if !modelHotUpdate {
  324. err = c.restartInferPod(job)
  325. if err != nil {
  326. klog.V(2).Infof("incrementallearning job %v/%v inference pod failed to restart, err:%s", job.Namespace, job.Name, err)
  327. return needUpdated, err
  328. }
  329. klog.V(2).Infof("incrementallearning job %v/%v inference pod restarts successfully", job.Namespace, job.Name)
  330. newConditionType = sednav1.ILJobStageCondCompleted
  331. } else {
  332. newConditionType = sednav1.ILJobStageCondStarting
  333. }
  334. } else {
  335. if podStatus != v1.PodPending && podStatus != v1.PodRunning {
  336. err = c.createPod(job, jobStage)
  337. if err != nil {
  338. return needUpdated, err
  339. }
  340. }
  341. newConditionType = sednav1.ILJobStageCondStarting
  342. }
  343. case sednav1.ILJobStageCondStarting, sednav1.ILJobStageCondRunning:
  344. if podStatus == v1.PodRunning {
  345. if jobStage == sednav1.ILJobDeploy {
  346. if !modelHotUpdate {
  347. newConditionType = sednav1.ILJobStageCondCompleted
  348. } else {
  349. // add nodeName to job
  350. if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil {
  351. return needUpdated, err
  352. }
  353. // watch pod status, if pod running, set type running
  354. newConditionType = sednav1.ILJobStageCondRunning
  355. }
  356. } else {
  357. // add nodeName to job
  358. if err := c.setWorkerNodeNameOfJob(job, string(jobStage), pod.Spec.NodeName); err != nil {
  359. return needUpdated, err
  360. }
  361. // watch pod status, if pod running, set type running
  362. newConditionType = sednav1.ILJobStageCondRunning
  363. }
  364. } else if podStatus == v1.PodSucceeded {
  365. // watch pod status, if pod completed, set type completed
  366. newConditionType = sednav1.ILJobStageCondCompleted
  367. klog.V(2).Infof("incrementallearning job %v/%v %v stage completed!", job.Namespace, job.Name, jobStage)
  368. } else if podStatus == v1.PodFailed {
  369. newConditionType = sednav1.ILJobStageCondFailed
  370. klog.V(2).Infof("incrementallearning job %v/%v %v stage failed!", job.Namespace, job.Name, jobStage)
  371. }
  372. case sednav1.ILJobStageCondCompleted:
  373. jobStage = getNextStage(jobStage)
  374. newConditionType = sednav1.ILJobStageCondWaiting
  375. case sednav1.ILJobStageCondFailed:
  376. jobStage = sednav1.ILJobTrain
  377. newConditionType = sednav1.ILJobStageCondWaiting
  378. default:
  379. // do nothing when given other type out of cases
  380. }
  381. klog.V(2).Infof("incrementallearning job %v/%v, conditions: %v", job.Namespace, job.Name, jobConditions)
  382. if latestCondition.Type != newConditionType {
  383. job.Status.Conditions = append(job.Status.Conditions, NewIncrementalJobCondition(newConditionType, jobStage))
  384. needUpdated = true
  385. }
  386. return needUpdated, nil
  387. }
  388. // updateJobStatus ensures that job status can be updated rightly
  389. func (c *Controller) updateJobStatus(job *sednav1.IncrementalLearningJob) error {
  390. jobClient := c.client.IncrementalLearningJobs(job.Namespace)
  391. return runtime.RetryUpdateStatus(job.Name, job.Namespace, func() error {
  392. newJob, err := jobClient.Get(context.TODO(), job.Name, metav1.GetOptions{})
  393. if err != nil {
  394. return err
  395. }
  396. newJob.Status = job.Status
  397. _, err = jobClient.UpdateStatus(context.TODO(), newJob, metav1.UpdateOptions{})
  398. return err
  399. })
  400. }
  401. func NewIncrementalJobCondition(conditionType sednav1.ILJobStageConditionType, jobStage sednav1.ILJobStage) sednav1.ILJobCondition {
  402. return sednav1.ILJobCondition{
  403. Type: conditionType,
  404. Status: v1.ConditionTrue,
  405. LastHeartbeatTime: metav1.Now(),
  406. LastTransitionTime: metav1.Now(),
  407. Reason: "",
  408. Message: "",
  409. Stage: jobStage,
  410. }
  411. }
  412. func (c *Controller) generatePodName(jobName string, workerType string) string {
  413. return jobName + "-" + strings.ToLower(workerType) + "-" + utilrand.String(5)
  414. }
  415. func (c *Controller) getSpecifiedPods(job *sednav1.IncrementalLearningJob, podType string) *v1.Pod {
  416. var latestPod *v1.Pod
  417. selector, _ := runtime.GenerateSelector(job)
  418. pods, err := c.podStore.Pods(job.Namespace).List(selector)
  419. if len(pods) == 0 || err != nil {
  420. return nil
  421. }
  422. var matchTag = false
  423. latestPod = pods[0]
  424. if podType == "Deploy" {
  425. podType = runtime.InferencePodType
  426. }
  427. for _, pod := range pods {
  428. s := strings.Split(pod.Name, "-")
  429. currentPodType := s[len(s)-2]
  430. if (latestPod.CreationTimestamp.Before(&pod.CreationTimestamp) || latestPod.CreationTimestamp.Equal(&pod.CreationTimestamp)) && currentPodType == strings.ToLower(podType) {
  431. latestPod = pod
  432. matchTag = true
  433. }
  434. }
  435. if !matchTag {
  436. return nil
  437. }
  438. return latestPod
  439. }
  440. func (c *Controller) restartInferPod(job *sednav1.IncrementalLearningJob) error {
  441. inferPod := c.getSpecifiedPods(job, runtime.InferencePodType)
  442. if inferPod == nil {
  443. klog.V(2).Infof("No inferpod is running in incrementallearning job %v/%v", job.Namespace, job.Name)
  444. err := c.createInferPod(job)
  445. return err
  446. }
  447. ctx := context.Background()
  448. err := c.kubeClient.CoreV1().Pods(job.Namespace).Delete(ctx, inferPod.Name, metav1.DeleteOptions{})
  449. if err != nil {
  450. klog.Warningf("failed to delete inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
  451. return err
  452. }
  453. err = c.createInferPod(job)
  454. if err != nil {
  455. klog.Warningf("failed to create inference pod %s for incrementallearning job %v/%v, err:%s", inferPod.Name, job.Namespace, job.Name, err)
  456. return err
  457. }
  458. return nil
  459. }
  460. func getNextStage(currentStage sednav1.ILJobStage) sednav1.ILJobStage {
  461. switch currentStage {
  462. case sednav1.ILJobTrain:
  463. return sednav1.ILJobEval
  464. case sednav1.ILJobEval:
  465. return sednav1.ILJobDeploy
  466. case sednav1.ILJobDeploy:
  467. return sednav1.ILJobTrain
  468. default:
  469. return sednav1.ILJobTrain
  470. }
  471. }
  472. func IsJobFinished(j *sednav1.IncrementalLearningJob) bool {
  473. // TODO
  474. return false
  475. }
  476. func (c *Controller) getSecret(namespace, name string, ownerStr string) (secret *v1.Secret, err error) {
  477. if name != "" {
  478. secret, err = c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
  479. if err != nil {
  480. err = fmt.Errorf("failed to get the secret %s for %s: %w",
  481. name,
  482. ownerStr, err)
  483. }
  484. }
  485. return
  486. }
  487. func (c *Controller) createPod(job *sednav1.IncrementalLearningJob, podtype sednav1.ILJobStage) (err error) {
  488. ctx := context.Background()
  489. var podTemplate *v1.PodTemplateSpec
  490. incrementalDatasetName := job.Spec.Dataset.Name
  491. initialModelName := job.Spec.InitialModel.Name
  492. deployModelName := job.Spec.DeploySpec.Model.Name
  493. // check initial model name
  494. initialModel, err := c.client.Models(job.Namespace).Get(ctx, initialModelName, metav1.GetOptions{})
  495. if err != nil {
  496. return fmt.Errorf("failed to get initial model %s: %w",
  497. initialModelName, err)
  498. }
  499. _, err = c.client.Models(job.Namespace).Get(ctx, deployModelName, metav1.GetOptions{})
  500. if err != nil {
  501. return fmt.Errorf("failed to get deploy model %s: %w",
  502. deployModelName, err)
  503. }
  504. dataset, err := c.client.Datasets(job.Namespace).Get(ctx, incrementalDatasetName, metav1.GetOptions{})
  505. if err != nil {
  506. return fmt.Errorf("failed to get dataset %s: %w",
  507. incrementalDatasetName, err)
  508. }
  509. datasetSecret, err := c.getSecret(
  510. job.Namespace,
  511. dataset.Spec.CredentialName,
  512. fmt.Sprintf("dataset %s", dataset.Name),
  513. )
  514. if err != nil {
  515. return err
  516. }
  517. jobSecret, err := c.getSecret(
  518. job.Namespace,
  519. job.Spec.CredentialName,
  520. fmt.Sprintf("incremental job %s", job.Name),
  521. )
  522. if err != nil {
  523. return err
  524. }
  525. // get all url for train and eval from data in condition
  526. var cond IncrementalCondData
  527. condDataStr := job.Status.Conditions[len(job.Status.Conditions)-1].Data
  528. klog.V(2).Infof("incrementallearning job %v/%v data condition:%s", job.Namespace, job.Name, condDataStr)
  529. (&cond).Unmarshal([]byte(condDataStr))
  530. if cond.Input == nil {
  531. return fmt.Errorf("empty input from condData")
  532. }
  533. dataURL := cond.Input.DataURL
  534. inputmodelURLs := cond.GetInputModelURLs()
  535. var originalDataURLOrIndex string
  536. if cond.Input.DataIndexURL != "" {
  537. // this guarantee dataset.Spec.URL is not in host filesystem by LC,
  538. // but cond.Input.DataIndexURL could be in host filesystem.
  539. originalDataURLOrIndex = cond.Input.DataIndexURL
  540. } else {
  541. originalDataURLOrIndex = dataset.Spec.URL
  542. }
  543. var workerParam runtime.WorkerParam
  544. if podtype == sednav1.ILJobTrain {
  545. workerParam.WorkerType = runtime.TrainPodType
  546. podTemplate = &job.Spec.TrainSpec.Template
  547. // Env parameters for train
  548. workerParam.Env = map[string]string{
  549. "NAMESPACE": job.Namespace,
  550. "JOB_NAME": job.Name,
  551. "WORKER_NAME": "train-worker-" + utilrand.String(5),
  552. "LC_SERVER": c.cfg.LC.Server,
  553. }
  554. baseModelURL := inputmodelURLs[0]
  555. var baseModelSecret *v1.Secret
  556. if baseModelURL == initialModel.Spec.URL {
  557. baseModelSecret, err = c.getSecret(
  558. job.Namespace,
  559. initialModel.Spec.CredentialName,
  560. fmt.Sprintf("initial model %s", initialModelName),
  561. )
  562. if err != nil {
  563. return err
  564. }
  565. } else {
  566. baseModelSecret = jobSecret
  567. }
  568. workerParam.Mounts = append(workerParam.Mounts,
  569. runtime.WorkerMount{
  570. URL: &runtime.MountURL{
  571. URL: baseModelURL,
  572. Secret: baseModelSecret,
  573. DownloadByInitializer: true,
  574. },
  575. EnvName: "BASE_MODEL_URL",
  576. },
  577. runtime.WorkerMount{
  578. URL: &runtime.MountURL{
  579. URL: cond.Input.OutputDir,
  580. Secret: jobSecret,
  581. DownloadByInitializer: false,
  582. },
  583. EnvName: "MODEL_URL",
  584. },
  585. runtime.WorkerMount{
  586. URL: &runtime.MountURL{
  587. URL: dataURL,
  588. DownloadByInitializer: true,
  589. Secret: jobSecret,
  590. },
  591. EnvName: "TRAIN_DATASET_URL",
  592. },
  593. // see https://github.com/kubeedge/sedna/issues/35
  594. runtime.WorkerMount{
  595. URL: &runtime.MountURL{
  596. Secret: datasetSecret,
  597. URL: originalDataURLOrIndex,
  598. DownloadByInitializer: true,
  599. Indirect: dataset.Spec.URL != originalDataURLOrIndex,
  600. },
  601. EnvName: "ORIGINAL_DATASET_URL",
  602. },
  603. )
  604. } else {
  605. // Configure eval worker's mounts and envs
  606. podTemplate = &job.Spec.EvalSpec.Template
  607. workerParam.WorkerType = "Eval"
  608. workerParam.Env = map[string]string{
  609. "NAMESPACE": job.Namespace,
  610. "JOB_NAME": job.Name,
  611. "WORKER_NAME": "eval-worker-" + utilrand.String(5),
  612. "LC_SERVER": c.cfg.LC.Server,
  613. }
  614. var modelMountURLs []runtime.MountURL
  615. for _, url := range inputmodelURLs {
  616. var modelSecret *v1.Secret
  617. if url == initialModel.Spec.URL {
  618. modelSecret, err = c.getSecret(
  619. job.Namespace,
  620. initialModel.Spec.CredentialName,
  621. fmt.Sprintf("initial model %s", initialModelName),
  622. )
  623. if err != nil {
  624. return err
  625. }
  626. } else {
  627. modelSecret = jobSecret
  628. }
  629. modelMountURLs = append(modelMountURLs, runtime.MountURL{
  630. URL: url,
  631. Secret: modelSecret,
  632. DownloadByInitializer: true,
  633. })
  634. }
  635. workerParam.Mounts = append(workerParam.Mounts,
  636. runtime.WorkerMount{
  637. URLs: modelMountURLs,
  638. Name: "models",
  639. EnvName: "MODEL_URLS",
  640. },
  641. runtime.WorkerMount{
  642. URL: &runtime.MountURL{
  643. URL: dataURL,
  644. Secret: datasetSecret,
  645. DownloadByInitializer: true,
  646. },
  647. Name: "datasets",
  648. EnvName: "TEST_DATASET_URL",
  649. },
  650. runtime.WorkerMount{
  651. URL: &runtime.MountURL{
  652. Secret: datasetSecret,
  653. URL: originalDataURLOrIndex,
  654. DownloadByInitializer: true,
  655. Indirect: dataset.Spec.URL != originalDataURLOrIndex,
  656. },
  657. Name: "origin-dataset",
  658. EnvName: "ORIGINAL_DATASET_URL",
  659. },
  660. )
  661. }
  662. // set the default policy instead of Always policy
  663. workerParam.RestartPolicy = v1.RestartPolicyOnFailure
  664. workerParam.HostNetwork = true
  665. // create pod based on podtype
  666. _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, podTemplate, &workerParam)
  667. return
  668. }
  669. func (c *Controller) createInferPod(job *sednav1.IncrementalLearningJob) error {
  670. infermodelName := job.Spec.DeploySpec.Model.Name
  671. inferModel, err := c.client.Models(job.Namespace).Get(context.TODO(), infermodelName, metav1.GetOptions{})
  672. if err != nil {
  673. return fmt.Errorf("failed to get infer model %s: %w",
  674. infermodelName, err)
  675. }
  676. inferModelURL := inferModel.Spec.URL
  677. HEMParameterJSON, _ := json.Marshal(job.Spec.DeploySpec.HardExampleMining.Parameters)
  678. HEMParameterString := string(HEMParameterJSON)
  679. modelSecret, err := c.getSecret(
  680. job.Namespace,
  681. inferModel.Spec.CredentialName,
  682. fmt.Sprintf("model %s", inferModel.Name),
  683. )
  684. // Configure inference worker's mounts and envs
  685. var workerParam runtime.WorkerParam
  686. workerParam.Mounts = append(workerParam.Mounts,
  687. runtime.WorkerMount{
  688. URL: &runtime.MountURL{
  689. URL: inferModelURL,
  690. Secret: modelSecret,
  691. DownloadByInitializer: true,
  692. },
  693. Name: "model",
  694. EnvName: "MODEL_URL",
  695. },
  696. )
  697. workerParam.Env = map[string]string{
  698. "NAMESPACE": job.Namespace,
  699. "JOB_NAME": job.Name,
  700. "WORKER_NAME": "inferworker-" + utilrand.String(5),
  701. "HEM_NAME": job.Spec.DeploySpec.HardExampleMining.Name,
  702. "HEM_PARAMETERS": HEMParameterString,
  703. "LC_SERVER": c.cfg.LC.Server,
  704. }
  705. modelHotUpdate := job.Spec.DeploySpec.Model.HotUpdateEnabled
  706. if modelHotUpdate {
  707. workerParam.ModelHotUpdate.Enable = true
  708. workerParam.ModelHotUpdate.PollPeriodSeconds = job.Spec.DeploySpec.Model.PollPeriodSeconds
  709. }
  710. workerParam.WorkerType = runtime.InferencePodType
  711. workerParam.HostNetwork = true
  712. // create the inference worker
  713. if _, err = runtime.CreatePodWithTemplate(c.kubeClient, job, &job.Spec.DeploySpec.Template, &workerParam); err != nil {
  714. return err
  715. }
  716. if modelHotUpdate {
  717. c.addJobAnnotations(job, runtime.ModelHotUpdateAnnotationsKey,
  718. runtime.GetModelHotUpdateConfigFile(job, runtime.ModelHotUpdateHostPrefix))
  719. }
  720. return err
  721. }
  722. // New creates a new incremental learning job controller that keeps the relevant pods
  723. // in sync with the corresponding IncrementalLearningJob objects.
  724. func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
  725. podInformer := cc.KubeInformerFactory.Core().V1().Pods()
  726. jobInformer := cc.SednaInformerFactory.Sedna().V1alpha1().IncrementalLearningJobs()
  727. eventBroadcaster := record.NewBroadcaster()
  728. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})
  729. jc := &Controller{
  730. kubeClient: cc.KubeClient,
  731. client: cc.SednaClient.SednaV1alpha1(),
  732. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name),
  733. cfg: cc.Config,
  734. }
  735. jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  736. AddFunc: func(obj interface{}) {
  737. jc.enqueueController(obj, true)
  738. jc.syncToEdge(watch.Added, obj)
  739. },
  740. UpdateFunc: func(old, cur interface{}) {
  741. jc.enqueueController(cur, true)
  742. jc.syncToEdge(watch.Added, cur)
  743. },
  744. DeleteFunc: func(obj interface{}) {
  745. jc.enqueueController(obj, true)
  746. jc.syncToEdge(watch.Deleted, obj)
  747. },
  748. })
  749. jc.jobLister = jobInformer.Lister()
  750. jc.jobStoreSynced = jobInformer.Informer().HasSynced
  751. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  752. AddFunc: jc.addPod,
  753. UpdateFunc: jc.updatePod,
  754. DeleteFunc: jc.deletePod,
  755. })
  756. jc.podStore = podInformer.Lister()
  757. jc.podStoreSynced = podInformer.Informer().HasSynced
  758. return jc, nil
  759. }