You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

jointinferenceservice.go 21 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package jointinference
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "reflect"
  19. "strconv"
  20. "strings"
  21. "time"
  22. appsv1 "k8s.io/api/apps/v1"
  23. v1 "k8s.io/api/core/v1"
  24. "k8s.io/apimachinery/pkg/api/errors"
  25. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  26. utilrand "k8s.io/apimachinery/pkg/util/rand"
  27. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  28. "k8s.io/apimachinery/pkg/util/wait"
  29. "k8s.io/apimachinery/pkg/watch"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/kubernetes/scheme"
  32. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  33. appslisters "k8s.io/client-go/listers/apps/v1"
  34. "k8s.io/client-go/tools/cache"
  35. "k8s.io/client-go/tools/record"
  36. "k8s.io/client-go/util/workqueue"
  37. "k8s.io/klog/v2"
  38. k8scontroller "k8s.io/kubernetes/pkg/controller"
  39. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  40. sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  41. sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
  42. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  43. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  44. )
  45. const (
  46. // Name is this controller name
  47. Name = "JointInference"
  48. // KindName is the kind name of CR this controller controls
  49. KindName = "JointInferenceService"
  50. )
  51. const (
  52. jointInferenceForEdge = "Edge"
  53. jointInferenceForCloud = "Cloud"
  54. BigModelPort = 5000
  55. )
  56. // gvk contains the schema.GroupVersionKind for this controller type.
  57. var gvk = sednav1.SchemeGroupVersion.WithKind(KindName)
  58. // Controller ensures that all JointInferenceService objects
  59. // have corresponding deployments to run their configured workload.
  60. type Controller struct {
  61. kubeClient kubernetes.Interface
  62. client sednaclientset.SednaV1alpha1Interface
  63. // deploymentsSynced returns true if the deployment store has been synced at least once.
  64. deploymentsSynced cache.InformerSynced
  65. // A store of deployment
  66. deploymentsLister appslisters.DeploymentLister
  67. // serviceStoreSynced returns true if the JointInferenceService store has been synced at least once.
  68. serviceStoreSynced cache.InformerSynced
  69. // A store of service
  70. serviceLister sednav1listers.JointInferenceServiceLister
  71. // JointInferenceServices that need to be updated
  72. queue workqueue.RateLimitingInterface
  73. recorder record.EventRecorder
  74. cfg *config.ControllerConfig
  75. sendToEdgeFunc runtime.DownstreamSendFunc
  76. }
  77. // Run starts the main goroutine responsible for watching and syncing services.
  78. func (c *Controller) Run(stopCh <-chan struct{}) {
  79. workers := 1
  80. defer utilruntime.HandleCrash()
  81. defer c.queue.ShutDown()
  82. klog.Infof("Starting %s controller", Name)
  83. defer klog.Infof("Shutting down %s controller", Name)
  84. if !cache.WaitForNamedCacheSync(Name, stopCh, c.deploymentsSynced, c.serviceStoreSynced) {
  85. klog.Errorf("failed to wait for %s caches to sync", Name)
  86. return
  87. }
  88. klog.Infof("Starting %s workers", Name)
  89. for i := 0; i < workers; i++ {
  90. go wait.Until(c.worker, time.Second, stopCh)
  91. }
  92. <-stopCh
  93. }
  94. // obj could be an *sednav1.JointInferenceService, or a DeletionFinalStateUnknown marker item,
  95. // immediate tells the controller to update the status right away, and should
  96. // happen ONLY when there was a successful pod run.
  97. func (c *Controller) enqueueController(obj interface{}, immediate bool) {
  98. key, err := k8scontroller.KeyFunc(obj)
  99. if err != nil {
  100. klog.Warningf("Couldn't get key for object %+v: %v", obj, err)
  101. return
  102. }
  103. backoff := time.Duration(0)
  104. if !immediate {
  105. backoff = runtime.GetBackoff(c.queue, key)
  106. }
  107. c.queue.AddAfter(key, backoff)
  108. }
  109. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  110. // It enforces that the sync is never invoked concurrently with the same key.
  111. func (c *Controller) worker() {
  112. for c.processNextWorkItem() {
  113. }
  114. }
  115. func (c *Controller) processNextWorkItem() bool {
  116. key, quit := c.queue.Get()
  117. if quit {
  118. return false
  119. }
  120. defer c.queue.Done(key)
  121. forget, err := c.sync(key.(string))
  122. if err == nil {
  123. if forget {
  124. c.queue.Forget(key)
  125. }
  126. return true
  127. }
  128. klog.Warningf("Error syncing jointinference service: %v", err)
  129. c.queue.AddRateLimited(key)
  130. return true
  131. }
  132. // sync will sync the jointinferenceservice with the given key.
  133. // This function is not meant to be invoked concurrently with the same key.
  134. func (c *Controller) sync(key string) (bool, error) {
  135. startTime := time.Now()
  136. defer func() {
  137. klog.V(4).Infof("Finished syncing jointinference service %q (%v)", key, time.Since(startTime))
  138. }()
  139. ns, name, err := cache.SplitMetaNamespaceKey(key)
  140. if err != nil {
  141. return false, err
  142. }
  143. if len(ns) == 0 || len(name) == 0 {
  144. return false, fmt.Errorf("invalid jointinference service key %q: either namespace or name is missing", key)
  145. }
  146. // Use Lister to obtain the JointInferenceService object (Lister is a cache reading mechanism).
  147. // If the service does not exist (has been deleted), log the message and return true, indicating that this object no longer needs to be synchronized.
  148. // If the acquisition fails but not because the object has been deleted, return an error.
  149. sharedService, err := c.serviceLister.JointInferenceServices(ns).Get(name)
  150. if err != nil {
  151. if errors.IsNotFound(err) {
  152. klog.V(4).Infof("JointInferenceService has been deleted: %v", key)
  153. return true, nil
  154. }
  155. return false, err
  156. }
  157. service := *sharedService
  158. // if service was finished previously, we don't want to redo the termination
  159. if isServiceFinished(&service) {
  160. return true, nil
  161. }
  162. // set kind for service in case that the kind is None
  163. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  164. service.SetGroupVersionKind(gvk)
  165. selectorDeployments, _ := runtime.GenerateSelector(&service)
  166. deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(selectorDeployments)
  167. if err != nil {
  168. return false, err
  169. }
  170. klog.V(4).Infof("list jointinference service %v/%v, %v deployments: %v", service.Namespace, service.Name, len(deployments), deployments)
  171. latestConditionLen := len(service.Status.Conditions)
  172. activeDeployments := runtime.CalcActiveDeploymentCount(deployments)
  173. var failed int32 = 0
  174. // neededCounts means that two deployments should be created successfully in a jointinference service currently
  175. // two deployments consist of edge deployment and cloud deployment
  176. var neededCounts int32 = 2
  177. if service.Status.StartTime == nil {
  178. now := metav1.Now()
  179. service.Status.StartTime = &now
  180. } else {
  181. failed = neededCounts - activeDeployments
  182. }
  183. var manageServiceErr error
  184. serviceFailed := false
  185. var latestConditionType sednav1.JointInferenceServiceConditionType = ""
  186. // get the latest condition type
  187. // based on that condition updated is appended, not inserted.
  188. jobConditions := service.Status.Conditions
  189. if len(jobConditions) > 0 {
  190. latestConditionType = (jobConditions)[len(jobConditions)-1].Type
  191. }
  192. var newCondtionType sednav1.JointInferenceServiceConditionType
  193. var reason string
  194. var message string
  195. if failed > 0 {
  196. serviceFailed = true
  197. // TODO: get the failed worker, and knows that which worker fails, edge inference worker or cloud inference worker
  198. reason = "workerFailed"
  199. message = "the worker of service failed"
  200. newCondtionType = sednav1.JointInferenceServiceCondFailed
  201. c.recorder.Event(&service, v1.EventTypeWarning, reason, message)
  202. } else {
  203. if len(deployments) == 0 {
  204. activeDeployments, manageServiceErr = c.createWorkers(&service)
  205. }
  206. if manageServiceErr != nil {
  207. serviceFailed = true
  208. message = error.Error(manageServiceErr)
  209. newCondtionType = sednav1.JointInferenceServiceCondFailed
  210. failed = neededCounts - activeDeployments
  211. } else {
  212. // TODO: handle the case that the pod phase is PodSucceeded
  213. newCondtionType = sednav1.JointInferenceServiceCondRunning
  214. }
  215. }
  216. //
  217. if newCondtionType != latestConditionType {
  218. service.Status.Conditions = append(service.Status.Conditions, newServiceCondition(newCondtionType, reason, message))
  219. }
  220. forget := false
  221. // no need to update the jointinferenceservice if the status hasn't changed since last time
  222. if service.Status.Active != activeDeployments || service.Status.Failed != failed || len(service.Status.Conditions) != latestConditionLen {
  223. service.Status.Active = activeDeployments
  224. service.Status.Failed = failed
  225. if err := c.updateStatus(&service); err != nil {
  226. return forget, err
  227. }
  228. if serviceFailed && !isServiceFinished(&service) {
  229. // returning an error will re-enqueue jointinferenceservice after the backoff period
  230. return forget, fmt.Errorf("failed deployment(s) detected for jointinference service key %q", key)
  231. }
  232. forget = true
  233. }
  234. return forget, manageServiceErr
  235. }
  236. // newServiceCondition creates a new joint condition
  237. func newServiceCondition(conditionType sednav1.JointInferenceServiceConditionType, reason, message string) sednav1.JointInferenceServiceCondition {
  238. return sednav1.JointInferenceServiceCondition{
  239. Type: conditionType,
  240. Status: v1.ConditionTrue,
  241. LastHeartbeatTime: metav1.Now(),
  242. LastTransitionTime: metav1.Now(),
  243. Reason: reason,
  244. Message: message,
  245. }
  246. }
  247. func (c *Controller) updateStatus(service *sednav1.JointInferenceService) error {
  248. client := c.client.JointInferenceServices(service.Namespace)
  249. return runtime.RetryUpdateStatus(service.Name, service.Namespace, func() error {
  250. newService, err := client.Get(context.TODO(), service.Name, metav1.GetOptions{})
  251. if err != nil {
  252. return err
  253. }
  254. newService.Status = service.Status
  255. _, err = client.UpdateStatus(context.TODO(), newService, metav1.UpdateOptions{})
  256. return err
  257. })
  258. }
  259. func isServiceFinished(j *sednav1.JointInferenceService) bool {
  260. for _, c := range j.Status.Conditions {
  261. if (c.Type == sednav1.JointInferenceServiceCondFailed) && c.Status == v1.ConditionTrue {
  262. return true
  263. }
  264. }
  265. return false
  266. }
  267. func (c *Controller) createWorkers(service *sednav1.JointInferenceService) (active int32, err error) {
  268. active = 0
  269. var bigModelPort int32 = BigModelPort
  270. // create cloud worker
  271. err = c.createCloudWorker(service, bigModelPort)
  272. if err != nil {
  273. return active, err
  274. }
  275. active++
  276. // create k8s service for cloud deployment
  277. bigModelHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, jointInferenceForCloud, bigModelPort)
  278. if err != nil {
  279. return active, err
  280. }
  281. // create edge worker
  282. err = c.createEdgeWorker(service, bigModelHost, bigModelPort)
  283. if err != nil {
  284. return active, err
  285. }
  286. active++
  287. return active, err
  288. }
  289. // enqueueByDeployment enqueues the JointInferenceService object of the specified deployment.
  290. func (c *Controller) enqueueByDeployment(deployment *appsv1.Deployment, immediate bool) {
  291. controllerRef := metav1.GetControllerOf(deployment)
  292. klog.Infof("Deployment enqueued %v", deployment.Kind)
  293. if controllerRef == nil {
  294. return
  295. }
  296. if controllerRef.Kind != gvk.Kind {
  297. return
  298. }
  299. service, err := c.serviceLister.JointInferenceServices(deployment.Namespace).Get(controllerRef.Name)
  300. if err != nil {
  301. return
  302. }
  303. if service.UID != controllerRef.UID {
  304. return
  305. }
  306. c.enqueueController(service, immediate)
  307. }
  308. // When a deployment is created, enqueue the controller that manages it and update it's expectations.
  309. func (c *Controller) addDeployment(obj interface{}) {
  310. deployment := obj.(*appsv1.Deployment)
  311. c.enqueueByDeployment(deployment, true)
  312. }
  313. // When a deployment is updated, figure out what jointinferenceservice manage it and wake them up.
  314. func (c *Controller) updateDeployment(old, cur interface{}) {
  315. oldD := old.(*appsv1.Deployment)
  316. curD := cur.(*appsv1.Deployment)
  317. // no deployment update, no queue
  318. if curD.ResourceVersion == oldD.ResourceVersion {
  319. return
  320. }
  321. c.addDeployment(curD)
  322. }
  323. // deleteDeployment enqueues the jointinferenceservice obj When a deleteDeployment is deleted
  324. func (c *Controller) deleteDeployment(obj interface{}) {
  325. deployment, ok := obj.(*appsv1.Deployment)
  326. if !ok {
  327. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  328. if !ok {
  329. klog.Warningf("couldn't get object from tombstone %+v", obj)
  330. return
  331. }
  332. deployment, ok = tombstone.Obj.(*appsv1.Deployment)
  333. if !ok {
  334. klog.Warningf("tombstone contained object that is not a Deployment %+v", obj)
  335. return
  336. }
  337. }
  338. // If the deployment is accidentally deleted, recreate the deployment.
  339. newDeployment := deployment.DeepCopy()
  340. serviceName := func(input string) string {
  341. return strings.Split(input, "-deployment")[0]
  342. }(newDeployment.Name)
  343. _, err := c.serviceLister.JointInferenceServices(newDeployment.Namespace).Get(serviceName)
  344. if !errors.IsNotFound(err) {
  345. // Remove unnecessary metadata.
  346. newDeployment.ResourceVersion = ""
  347. newDeployment.UID = ""
  348. // Create a new deployment.
  349. _, err := c.kubeClient.AppsV1().Deployments(newDeployment.Namespace).Create(context.TODO(), newDeployment, metav1.CreateOptions{})
  350. if err != nil {
  351. klog.Errorf("failed to recreate deployment %s: %v", deployment.Name, err)
  352. return
  353. }
  354. }
  355. klog.Infof("Successfully recreated deployment %s", deployment.Name)
  356. c.enqueueByDeployment(newDeployment, true)
  357. }
  358. func (c *Controller) updateInferenceServices(old, cur interface{}) error {
  359. oldService := old.(*sednav1.JointInferenceService)
  360. newService := cur.(*sednav1.JointInferenceService)
  361. // Check if the cloud worker configuration has changed
  362. if !reflect.DeepEqual(oldService.Spec.CloudWorker, newService.Spec.CloudWorker) {
  363. // Update cloud worker and log any errors
  364. if err := c.updateCloudWorker(newService); err != nil {
  365. klog.Errorf("Failed to update cloud worker for service %s/%s: %v", newService.Namespace, newService.Name, err)
  366. }
  367. }
  368. // Retrieve the address of the cloud inference service
  369. var bigModelHost string
  370. svc, err := c.kubeClient.CoreV1().Services(oldService.Namespace).Get(context.Background(),
  371. strings.ToLower(oldService.Name+"-"+jointInferenceForCloud), metav1.GetOptions{})
  372. if err != nil {
  373. if errors.IsNotFound(err) {
  374. // If the service does not exist, create a new one and retrieve its address
  375. klog.Info("Cloud service not found, creating new service...")
  376. bigModelHost, err = runtime.CreateEdgeMeshService(c.kubeClient, oldService, jointInferenceForCloud, BigModelPort)
  377. if err != nil {
  378. klog.Errorf("Failed to create EdgeMesh service for service %s/%s: %v", oldService.Namespace, oldService.Name, err)
  379. }
  380. } else {
  381. klog.Errorf("Failed to get cloud service %s/%s: %v", oldService.Namespace, oldService.Name, err)
  382. }
  383. } else {
  384. bigModelHost = fmt.Sprintf("%s.%s", svc.Name, svc.Namespace)
  385. }
  386. // Check if the edge worker configuration has changed
  387. if !reflect.DeepEqual(oldService.Spec.EdgeWorker, newService.Spec.EdgeWorker) {
  388. // Update edge worker and log any errors
  389. if err := c.updateEdgeWorker(newService, bigModelHost); err != nil {
  390. klog.Errorf("Failed to update edge worker for service %s/%s: %v", newService.Namespace, newService.Name, err)
  391. }
  392. }
  393. return nil
  394. }
  395. func (c *Controller) createOrUpdateWorker(service *sednav1.JointInferenceService, workerType string, bigModelHost string, bigModelPort int32, create bool) error {
  396. var modelName string
  397. var modelTemplate v1.PodTemplateSpec
  398. var workerParam runtime.WorkerParam
  399. // Set the corresponding parameters according to the workerType.
  400. switch workerType {
  401. case jointInferenceForCloud:
  402. modelName = service.Spec.CloudWorker.Model.Name
  403. modelTemplate = *service.Spec.CloudWorker.Template.DeepCopy()
  404. workerParam.Env = map[string]string{
  405. "BIG_MODEL_BIND_PORT": strconv.Itoa(int(bigModelPort)),
  406. }
  407. workerParam.WorkerType = workerType
  408. case jointInferenceForEdge:
  409. modelName = service.Spec.EdgeWorker.Model.Name
  410. modelTemplate = *service.Spec.EdgeWorker.Template.DeepCopy()
  411. HEMParameterJSON, _ := json.Marshal(service.Spec.EdgeWorker.HardExampleMining.Parameters)
  412. HEMParameterString := string(HEMParameterJSON)
  413. workerParam.Env = map[string]string{
  414. "BIG_MODEL_IP": bigModelHost,
  415. "BIG_MODEL_PORT": strconv.Itoa(int(bigModelPort)),
  416. "HEM_NAME": service.Spec.EdgeWorker.HardExampleMining.Name,
  417. "HEM_PARAMETERS": HEMParameterString,
  418. "LC_SERVER": c.cfg.LC.Server,
  419. }
  420. workerParam.WorkerType = workerType
  421. }
  422. // get the model.
  423. model, err := c.client.Models(service.Namespace).Get(context.Background(), modelName, metav1.GetOptions{})
  424. if err != nil {
  425. return fmt.Errorf("failed to get model %s: %w", modelName, err)
  426. }
  427. secretName := model.Spec.CredentialName
  428. var modelSecret *v1.Secret
  429. if secretName != "" {
  430. modelSecret, _ = c.kubeClient.CoreV1().Secrets(service.Namespace).Get(context.TODO(), secretName, metav1.GetOptions{})
  431. }
  432. // Fill in the mounting configuration of workerParam.
  433. workerParam.Mounts = append(workerParam.Mounts, runtime.WorkerMount{
  434. URL: &runtime.MountURL{
  435. URL: model.Spec.URL,
  436. Secret: modelSecret,
  437. DownloadByInitializer: true,
  438. },
  439. Name: "model",
  440. EnvName: "MODEL_URL",
  441. })
  442. // Set other common environment variables.
  443. workerParam.Env["NAMESPACE"] = service.Namespace
  444. workerParam.Env["SERVICE_NAME"] = service.Name
  445. workerParam.Env["WORKER_NAME"] = strings.ToLower(workerType) + "worker-" + utilrand.String(5)
  446. // Create or update Deployment.
  447. if create {
  448. _, err = runtime.CreateDeploymentWithTemplate(c.kubeClient, service, &appsv1.DeploymentSpec{Template: modelTemplate}, &workerParam)
  449. } else {
  450. service.SetGroupVersionKind(gvk)
  451. workerName := service.Name + "-deployment-" + strings.ToLower(workerType)
  452. existingDeployment, err := c.deploymentsLister.Deployments(service.Namespace).Get(workerName)
  453. if err != nil {
  454. return fmt.Errorf("get %s Deployment failed:%v", strings.ToLower(workerType), err)
  455. }
  456. newDeployment := existingDeployment.DeepCopy()
  457. newDeployment.Spec.Template = modelTemplate
  458. _, err = runtime.UpdateDeploymentWithTemplate(c.kubeClient, service, newDeployment, &workerParam)
  459. }
  460. return err
  461. }
  462. func (c *Controller) createCloudWorker(service *sednav1.JointInferenceService, bigModelPort int32) error {
  463. return c.createOrUpdateWorker(service, jointInferenceForCloud, "", bigModelPort, true)
  464. }
  465. func (c *Controller) createEdgeWorker(service *sednav1.JointInferenceService, bigModelHost string, bigModelPort int32) error {
  466. return c.createOrUpdateWorker(service, jointInferenceForEdge, bigModelHost, bigModelPort, true)
  467. }
  468. func (c *Controller) updateCloudWorker(newservice *sednav1.JointInferenceService) error {
  469. return c.createOrUpdateWorker(newservice, jointInferenceForCloud, "", BigModelPort, false)
  470. }
  471. func (c *Controller) updateEdgeWorker(newservice *sednav1.JointInferenceService, bigModelHost string) error {
  472. return c.createOrUpdateWorker(newservice, jointInferenceForEdge, bigModelHost, BigModelPort, false)
  473. }
  474. // New creates a new JointInferenceService controller that keeps the relevant deployments
  475. // in sync with their corresponding JointInferenceService objects.
  476. func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
  477. cfg := cc.Config
  478. deploymentInformer := cc.KubeInformerFactory.Apps().V1().Deployments()
  479. serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().JointInferenceServices()
  480. eventBroadcaster := record.NewBroadcaster()
  481. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})
  482. jc := &Controller{
  483. kubeClient: cc.KubeClient,
  484. client: cc.SednaClient.SednaV1alpha1(),
  485. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "jointinferenceservice"),
  486. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "jointinferenceservice-controller"}),
  487. cfg: cfg,
  488. }
  489. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  490. AddFunc: func(obj interface{}) {
  491. jc.enqueueController(obj, true)
  492. jc.syncToEdge(watch.Added, obj)
  493. },
  494. UpdateFunc: func(old, cur interface{}) {
  495. jc.enqueueController(cur, true)
  496. jc.updateInferenceServices(old, cur)
  497. jc.syncToEdge(watch.Modified, cur)
  498. },
  499. DeleteFunc: func(obj interface{}) {
  500. jc.enqueueController(obj, true)
  501. jc.syncToEdge(watch.Deleted, obj)
  502. },
  503. })
  504. jc.serviceLister = serviceInformer.Lister()
  505. jc.serviceStoreSynced = serviceInformer.Informer().HasSynced
  506. deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  507. AddFunc: jc.addDeployment,
  508. UpdateFunc: jc.updateDeployment,
  509. DeleteFunc: jc.deleteDeployment,
  510. })
  511. jc.deploymentsLister = deploymentInformer.Lister()
  512. jc.deploymentsSynced = deploymentInformer.Informer().HasSynced
  513. return jc, nil
  514. }