You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

objectsearchservice.go 20 kB


  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package objectsearch
  14. import (
  15. "context"
  16. "fmt"
  17. "time"
  18. appsv1 "k8s.io/api/apps/v1"
  19. v1 "k8s.io/api/core/v1"
  20. "k8s.io/apimachinery/pkg/api/errors"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. utilrand "k8s.io/apimachinery/pkg/util/rand"
  23. utilruntime "k8s.io/apimachinery/pkg/util/runtime"
  24. "k8s.io/apimachinery/pkg/util/wait"
  25. "k8s.io/client-go/kubernetes"
  26. "k8s.io/client-go/kubernetes/scheme"
  27. v1core "k8s.io/client-go/kubernetes/typed/core/v1"
  28. appslisters "k8s.io/client-go/listers/apps/v1"
  29. corelisters "k8s.io/client-go/listers/core/v1"
  30. "k8s.io/client-go/tools/cache"
  31. "k8s.io/client-go/tools/record"
  32. "k8s.io/client-go/util/workqueue"
  33. "k8s.io/klog/v2"
  34. k8scontroller "k8s.io/kubernetes/pkg/controller"
  35. sednav1 "github.com/kubeedge/sedna/pkg/apis/sedna/v1alpha1"
  36. sednaclientset "github.com/kubeedge/sedna/pkg/client/clientset/versioned/typed/sedna/v1alpha1"
  37. sednav1listers "github.com/kubeedge/sedna/pkg/client/listers/sedna/v1alpha1"
  38. "github.com/kubeedge/sedna/pkg/globalmanager/config"
  39. "github.com/kubeedge/sedna/pkg/globalmanager/runtime"
  40. )
  41. const (
  42. // Name is this controller name
  43. Name = "ObjectSearch"
  44. // KindName is the kind name of CR this controller controls
  45. KindName = "ObjectSearchService"
  46. )
  47. const (
  48. objectSearchUserWorker = "userworker"
  49. objectSearchTrackingWorker = "trackingworker"
  50. objectSearchReidWorker = "reidworker"
  51. reidServicePort = 9378
  52. userWorkerPort = 9379
  53. )
  54. // Kind contains the schema.GroupVersionKind for this controller type.
  55. var Kind = sednav1.SchemeGroupVersion.WithKind(KindName)
  56. // Controller ensures that all ObjectSearchService objects
  57. // have corresponding pods to run their configured workload.
  58. type Controller struct {
  59. kubeClient kubernetes.Interface
  60. client sednaclientset.SednaV1alpha1Interface
  61. // podStoreSynced returns true if the pod store has been synced at least once.
  62. podStoreSynced cache.InformerSynced
  63. // A store of pods
  64. podStore corelisters.PodLister
  65. // deploymentsSynced returns true if the deployment store has been synced at least once.
  66. deploymentsSynced cache.InformerSynced
  67. // A store of deployment
  68. deploymentsLister appslisters.DeploymentLister
  69. // serviceStoreSynced returns true if the ObjectSearchService store has been synced at least once.
  70. serviceStoreSynced cache.InformerSynced
  71. // A store of service
  72. serviceLister sednav1listers.ObjectSearchServiceLister
  73. // ObjectSearchServices that need to be updated
  74. queue workqueue.RateLimitingInterface
  75. recorder record.EventRecorder
  76. cfg *config.ControllerConfig
  77. sendToEdgeFunc runtime.DownstreamSendFunc
  78. }
  79. // Run starts the main goroutine responsible for watching and syncing services.
  80. func (c *Controller) Run(stopCh <-chan struct{}) {
  81. workers := 1
  82. defer utilruntime.HandleCrash()
  83. defer c.queue.ShutDown()
  84. klog.Infof("Starting %s controller", Name)
  85. defer klog.Infof("Shutting down %s controller", Name)
  86. if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.serviceStoreSynced) {
  87. klog.Errorf("failed to wait for %s caches to sync", Name)
  88. return
  89. }
  90. klog.Infof("Starting %s workers", Name)
  91. for i := 0; i < workers; i++ {
  92. go wait.Until(c.worker, time.Second, stopCh)
  93. }
  94. <-stopCh
  95. }
  96. // enqueueByPod enqueues the ObjectSearchService object of the specified pod.
  97. func (c *Controller) enqueueByPod(pod *v1.Pod, immediate bool) {
  98. controllerRef := metav1.GetControllerOf(pod)
  99. if controllerRef == nil {
  100. return
  101. }
  102. if controllerRef.Kind != Kind.Kind {
  103. return
  104. }
  105. service, err := c.serviceLister.ObjectSearchServices(pod.Namespace).Get(controllerRef.Name)
  106. if err != nil {
  107. return
  108. }
  109. if service.UID != controllerRef.UID {
  110. return
  111. }
  112. c.enqueueController(service, immediate)
  113. }
  114. // When a pod is created, enqueue the controller that manages it and update it's expectations.
  115. func (c *Controller) addPod(obj interface{}) {
  116. pod := obj.(*v1.Pod)
  117. if pod.DeletionTimestamp != nil {
  118. // on a restart of the controller, it's possible a new pod shows up in a state that
  119. // is already pending deletion. Prevent the pod from being a creation observation.
  120. c.deletePod(pod)
  121. return
  122. }
  123. // backoff to queue when PodFailed
  124. immediate := pod.Status.Phase != v1.PodFailed
  125. c.enqueueByPod(pod, immediate)
  126. }
  127. // When a pod is updated, figure out what object search service manage it and wake them up.
  128. func (c *Controller) updatePod(old, cur interface{}) {
  129. curPod := cur.(*v1.Pod)
  130. oldPod := old.(*v1.Pod)
  131. // no pod update, no queue
  132. if curPod.ResourceVersion == oldPod.ResourceVersion {
  133. return
  134. }
  135. c.addPod(curPod)
  136. }
  137. // deletePod enqueues the ObjectSearchService obj When a pod is deleted
  138. func (c *Controller) deletePod(obj interface{}) {
  139. pod, ok := obj.(*v1.Pod)
  140. // comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/job/job_controller.go
  141. // When a delete is dropped, the relist will notice a pod in the store not
  142. // in the list, leading to the insertion of a tombstone object which contains
  143. // the deleted key/value. Note that this value might be stale. If the pod
  144. // changed labels the new ObjectSearchService will not be woken up till the periodic resync.
  145. if !ok {
  146. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  147. if !ok {
  148. klog.Warningf("couldn't get object from tombstone %+v", obj)
  149. return
  150. }
  151. pod, ok = tombstone.Obj.(*v1.Pod)
  152. if !ok {
  153. klog.Warningf("tombstone contained object that is not a pod %+v", obj)
  154. return
  155. }
  156. }
  157. c.enqueueByPod(pod, true)
  158. }
  159. // enqueueByDeployment enqueues the ObjectSearchService object of the specified deployment.
  160. func (c *Controller) enqueueByDeployment(deployment *appsv1.Deployment, immediate bool) {
  161. controllerRef := metav1.GetControllerOf(deployment)
  162. if controllerRef == nil {
  163. return
  164. }
  165. if controllerRef.Kind != Kind.Kind {
  166. return
  167. }
  168. service, err := c.serviceLister.ObjectSearchServices(deployment.Namespace).Get(controllerRef.Name)
  169. if err != nil {
  170. return
  171. }
  172. if service.UID != controllerRef.UID {
  173. return
  174. }
  175. c.enqueueController(service, immediate)
  176. }
  177. // When a deployment is created, enqueue the controller that manages it and update it's expectations.
  178. func (c *Controller) addDeployment(obj interface{}) {
  179. deployment := obj.(*appsv1.Deployment)
  180. c.enqueueByDeployment(deployment, true)
  181. }
  182. // deleteDeployment enqueues the ObjectSearchService obj When a deleteDeployment is deleted
  183. func (c *Controller) deleteDeployment(obj interface{}) {
  184. deployment, ok := obj.(*appsv1.Deployment)
  185. // comment from https://github.com/kubernetes/kubernetes/blob/master/pkg/controller/deployment/deployment_controller.go
  186. if !ok {
  187. tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
  188. if !ok {
  189. klog.Warningf("couldn't get object from tombstone %+v", obj)
  190. return
  191. }
  192. deployment, ok = tombstone.Obj.(*appsv1.Deployment)
  193. if !ok {
  194. klog.Warningf("tombstone contained object that is not a Deployment %+v", obj)
  195. return
  196. }
  197. }
  198. c.enqueueByDeployment(deployment, true)
  199. }
  200. // When a deployment is updated, figure out what object search service manage it and wake them up.
  201. func (c *Controller) updateDeployment(old, cur interface{}) {
  202. oldD := old.(*appsv1.Deployment)
  203. curD := cur.(*appsv1.Deployment)
  204. // no deployment update, no queue
  205. if curD.ResourceVersion == oldD.ResourceVersion {
  206. return
  207. }
  208. c.addDeployment(curD)
  209. }
  210. // obj could be an *sednav1.ObjectSearchService, or a DeletionFinalStateUnknown marker item,
  211. // immediate tells the controller to update the status right away, and should
  212. // happen ONLY when there was a successful pod run.
  213. func (c *Controller) enqueueController(obj interface{}, immediate bool) {
  214. key, err := k8scontroller.KeyFunc(obj)
  215. if err != nil {
  216. klog.Warningf("Couldn't get key for object %+v: %v", obj, err)
  217. return
  218. }
  219. backoff := time.Duration(0)
  220. if !immediate {
  221. backoff = runtime.GetBackoff(c.queue, key)
  222. }
  223. c.queue.AddAfter(key, backoff)
  224. }
  225. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
  226. // It enforces that the sync is never invoked concurrently with the same key.
  227. func (c *Controller) worker() {
  228. for c.processNextWorkItem() {
  229. }
  230. }
  231. func (c *Controller) processNextWorkItem() bool {
  232. key, quit := c.queue.Get()
  233. if quit {
  234. return false
  235. }
  236. defer c.queue.Done(key)
  237. forget, err := c.sync(key.(string))
  238. if err == nil {
  239. if forget {
  240. c.queue.Forget(key)
  241. }
  242. return true
  243. }
  244. klog.Warningf("Error syncing objectsearch service: %v", err)
  245. c.queue.AddRateLimited(key)
  246. return true
  247. }
  248. // sync will sync the objectsearchservice with the given key.
  249. // This function is not meant to be invoked concurrently with the same key.
  250. func (c *Controller) sync(key string) (bool, error) {
  251. startTime := time.Now()
  252. defer func() {
  253. klog.V(4).Infof("Finished syncing objectsearch service %q (%v)", key, time.Since(startTime))
  254. }()
  255. ns, name, err := cache.SplitMetaNamespaceKey(key)
  256. if err != nil {
  257. return false, err
  258. }
  259. if len(ns) == 0 || len(name) == 0 {
  260. return false, fmt.Errorf("invalid objectsearch service key %q: either namespace or name is missing", key)
  261. }
  262. sharedService, err := c.serviceLister.ObjectSearchServices(ns).Get(name)
  263. if err != nil {
  264. if errors.IsNotFound(err) {
  265. klog.V(4).Infof("ObjectSearchService has been deleted: %v", key)
  266. return true, nil
  267. }
  268. return false, err
  269. }
  270. service := *sharedService
  271. // if service was finished previously, we don't want to redo the termination.
  272. if isServiceFinished(&service) {
  273. return true, nil
  274. }
  275. // set kind for service in case that the kind is None.
  276. // more details at https://github.com/kubernetes/kubernetes/issues/3030
  277. service.SetGroupVersionKind(Kind)
  278. selectorPods, _ := runtime.GenerateWorkerSelector(&service, objectSearchTrackingWorker)
  279. selectorDeployments, _ := runtime.GenerateSelector(&service)
  280. pods, err := c.podStore.Pods(service.Namespace).List(selectorPods)
  281. deployments, err := c.deploymentsLister.Deployments(service.Namespace).List(selectorDeployments)
  282. if err != nil {
  283. return false, err
  284. }
  285. latestConditionLen := len(service.Status.Conditions)
  286. var podFailed int32 = 0
  287. var deploymentFailed int32 = 0
  288. // neededPodCounts indicates the num of tracking worker pods should be created successfully in a objectsearch service currently.
  289. // neededDeploymentCounts indicates the num of deployments should be created successfully in a objectsearch service currently,
  290. // and one deployment is for userWorker and the other deployment is for reidWorkers.
  291. var neededPodCounts = int32(len(service.Spec.TrackingWorkers))
  292. var neededDeploymentCounts int32 = 2
  293. activePods := runtime.CalcActivePodCount(pods)
  294. activeDeployments := runtime.CalcActiveDeploymentCount(deployments)
  295. if service.Status.StartTime == nil {
  296. now := metav1.Now()
  297. service.Status.StartTime = &now
  298. } else {
  299. podFailed = neededPodCounts - activePods
  300. deploymentFailed = neededDeploymentCounts - activeDeployments
  301. }
  302. var manageServiceErr error
  303. serviceFailed := false
  304. var latestConditionType sednav1.ObjectSearchServiceConditionType = ""
  305. // get the latest condition type
  306. // based on that condition updated is appended, not inserted.
  307. jobConditions := service.Status.Conditions
  308. if len(jobConditions) > 0 {
  309. latestConditionType = (jobConditions)[len(jobConditions)-1].Type
  310. }
  311. var newCondtionType sednav1.ObjectSearchServiceConditionType
  312. var reason string
  313. var message string
  314. switch {
  315. case podFailed > 0:
  316. serviceFailed = true
  317. reason = "podFailed"
  318. message = "the worker of service failed"
  319. newCondtionType = sednav1.ObjectSearchServiceCondFailed
  320. c.recorder.Event(&service, v1.EventTypeWarning, reason, message)
  321. case deploymentFailed > 0:
  322. serviceFailed = true
  323. reason = "deploymentFailed"
  324. message = "the worker of service failed"
  325. newCondtionType = sednav1.ObjectSearchServiceCondFailed
  326. c.recorder.Event(&service, v1.EventTypeWarning, reason, message)
  327. default:
  328. if len(pods) == 0 && len(deployments) == 0 {
  329. activePods, activeDeployments, manageServiceErr = c.createWorkers(&service)
  330. }
  331. if manageServiceErr != nil {
  332. klog.V(2).Infof("failed to create worker: %v", manageServiceErr)
  333. serviceFailed = true
  334. message = error.Error(manageServiceErr)
  335. newCondtionType = sednav1.ObjectSearchServiceCondFailed
  336. podFailed = neededPodCounts - activePods
  337. deploymentFailed = neededDeploymentCounts - activeDeployments
  338. } else {
  339. // TODO: handle the case that the pod phase is PodSucceeded
  340. newCondtionType = sednav1.ObjectSearchServiceCondRunning
  341. }
  342. }
  343. if newCondtionType != latestConditionType {
  344. service.Status.Conditions = append(service.Status.Conditions, newServiceCondition(newCondtionType, reason, message))
  345. }
  346. forget := false
  347. // calculate the number of active pods and deployments
  348. active := activePods + activeDeployments
  349. failed := podFailed + deploymentFailed
  350. // no need to update the objectsearchservice if the status hasn't changed since last time
  351. if service.Status.Active != active || service.Status.Failed != failed || len(service.Status.Conditions) != latestConditionLen {
  352. service.Status.Active = active
  353. service.Status.Failed = failed
  354. if err := c.updateStatus(&service); err != nil {
  355. return forget, err
  356. }
  357. if serviceFailed && !isServiceFinished(&service) {
  358. // returning an error will re-enqueue objectsearchservice after the backoff period
  359. return forget, fmt.Errorf("failed pod(s) detected for objectsearch service key %q", key)
  360. }
  361. forget = true
  362. }
  363. return forget, manageServiceErr
  364. }
  365. // newServiceCondition creates a new condition
  366. func newServiceCondition(conditionType sednav1.ObjectSearchServiceConditionType, reason, message string) sednav1.ObjectSearchServiceCondition {
  367. return sednav1.ObjectSearchServiceCondition{
  368. Type: conditionType,
  369. Status: v1.ConditionTrue,
  370. LastHeartbeatTime: metav1.Now(),
  371. LastTransitionTime: metav1.Now(),
  372. Reason: reason,
  373. Message: message,
  374. }
  375. }
  376. // countPods returns number of succeeded and failed pods
  377. func countPods(pods []*v1.Pod) (failed int32) {
  378. failed = int32(filterPods(pods, v1.PodFailed))
  379. return
  380. }
  381. // filterPods returns pods based on their phase.
  382. func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
  383. result := 0
  384. for i := range pods {
  385. if phase == pods[i].Status.Phase {
  386. result++
  387. }
  388. }
  389. return result
  390. }
  391. func (c *Controller) updateStatus(service *sednav1.ObjectSearchService) error {
  392. client := c.client.ObjectSearchServices(service.Namespace)
  393. return runtime.RetryUpdateStatus(service.Name, service.Namespace, func() error {
  394. newService, err := client.Get(context.TODO(), service.Name, metav1.GetOptions{})
  395. if err != nil {
  396. return err
  397. }
  398. newService.Status = service.Status
  399. _, err = client.UpdateStatus(context.TODO(), newService, metav1.UpdateOptions{})
  400. return err
  401. })
  402. }
  403. func isServiceFinished(j *sednav1.ObjectSearchService) bool {
  404. for _, c := range j.Status.Conditions {
  405. if (c.Type == sednav1.ObjectSearchServiceCondFailed) && c.Status == v1.ConditionTrue {
  406. return true
  407. }
  408. }
  409. return false
  410. }
  411. func (c *Controller) createWorkers(service *sednav1.ObjectSearchService) (activePods int32, activeDeployments int32, err error) {
  412. activePods = 0
  413. activeDeployments = 0
  414. // create reid worker deployment
  415. var reidWorkerParam runtime.WorkerParam
  416. reidWorkerParam.WorkerType = objectSearchReidWorker
  417. _, err = runtime.CreateDeploymentWithTemplate(c.kubeClient, service, &service.Spec.ReidWorkers.DeploymentSpec, &reidWorkerParam, reidServicePort)
  418. if err != nil {
  419. return activePods, activeDeployments, fmt.Errorf("failed to create reid worker deployment: %w", err)
  420. }
  421. activeDeployments++
  422. // create reid worker edgemesh service
  423. reidServiceHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, objectSearchReidWorker, reidServicePort)
  424. if err != nil {
  425. return activePods, activeDeployments, fmt.Errorf("failed to create reid worker edgemesh service: %w", err)
  426. }
  427. reidServiceURL := fmt.Sprintf("%s:%d", reidServiceHost, reidServicePort)
  428. // create user worker deployment
  429. userWorkerReplicas := int32(1)
  430. userWorkerDeployment := &appsv1.DeploymentSpec{
  431. Replicas: &userWorkerReplicas,
  432. Template: service.Spec.UserWorker.Template,
  433. }
  434. var userWorkerParam runtime.WorkerParam
  435. userWorkerParam.WorkerType = objectSearchUserWorker
  436. userWorkerParam.Env = map[string]string{
  437. "NAMESPACE": service.Namespace,
  438. "SERVICE_NAME": service.Name,
  439. "WORKER_NAME": "userworker-" + utilrand.String(5),
  440. }
  441. _, err = runtime.CreateDeploymentWithTemplate(c.kubeClient, service, userWorkerDeployment, &userWorkerParam, userWorkerPort)
  442. if err != nil {
  443. return activePods, activeDeployments, fmt.Errorf("failed to create user worker: %w", err)
  444. }
  445. activeDeployments++
  446. // create user worker service
  447. userWorkerHost, err := runtime.CreateEdgeMeshService(c.kubeClient, service, objectSearchUserWorker, userWorkerPort)
  448. if err != nil {
  449. return activePods, activeDeployments, fmt.Errorf("failed to create edgemesh service: %w", err)
  450. }
  451. userWorkerURL := fmt.Sprintf("%s:%d", userWorkerHost, userWorkerPort)
  452. // create tracking worker pods
  453. var trackingWorkerParam runtime.WorkerParam
  454. trackingWorkerParam.WorkerType = objectSearchTrackingWorker
  455. for i, trackingWorker := range service.Spec.TrackingWorkers {
  456. trackingWorkerParam.Env = map[string]string{
  457. "NAMESPACE": service.Namespace,
  458. "SERVICE_NAME": service.Name,
  459. "WORKER_NAME": "trackingworker-" + utilrand.String(5),
  460. "USERWORKER_URL": userWorkerURL,
  461. "EDGEMESH_URL": reidServiceURL,
  462. }
  463. _, err = runtime.CreatePodWithTemplate(c.kubeClient, service, &trackingWorker.Template, &trackingWorkerParam)
  464. if err != nil {
  465. return activePods, activeDeployments, fmt.Errorf("failed to create %dth tracking worker: %w", i, err)
  466. }
  467. activePods++
  468. }
  469. return activePods, activeDeployments, err
  470. }
  471. // New creates a new ObjectSearchService controller that keeps the relevant pods
  472. // in sync with their corresponding ObjectSearchService objects.
  473. func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
  474. cfg := cc.Config
  475. podInformer := cc.KubeInformerFactory.Core().V1().Pods()
  476. deploymentInformer := cc.KubeInformerFactory.Apps().V1().Deployments()
  477. serviceInformer := cc.SednaInformerFactory.Sedna().V1alpha1().ObjectSearchServices()
  478. eventBroadcaster := record.NewBroadcaster()
  479. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})
  480. jc := &Controller{
  481. kubeClient: cc.KubeClient,
  482. client: cc.SednaClient.SednaV1alpha1(),
  483. queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), "objectsearchservice"),
  484. recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "objectsearch-controller"}),
  485. cfg: cfg,
  486. }
  487. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  488. AddFunc: func(obj interface{}) {
  489. jc.enqueueController(obj, true)
  490. },
  491. UpdateFunc: func(old, cur interface{}) {
  492. jc.enqueueController(cur, true)
  493. },
  494. DeleteFunc: func(obj interface{}) {
  495. jc.enqueueController(obj, true)
  496. },
  497. })
  498. jc.serviceLister = serviceInformer.Lister()
  499. jc.serviceStoreSynced = serviceInformer.Informer().HasSynced
  500. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  501. AddFunc: jc.addPod,
  502. UpdateFunc: jc.updatePod,
  503. DeleteFunc: jc.deletePod,
  504. })
  505. jc.podStore = podInformer.Lister()
  506. jc.podStoreSynced = podInformer.Informer().HasSynced
  507. deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
  508. AddFunc: jc.addDeployment,
  509. UpdateFunc: jc.updateDeployment,
  510. DeleteFunc: jc.deleteDeployment,
  511. })
  512. jc.deploymentsLister = deploymentInformer.Lister()
  513. jc.deploymentsSynced = deploymentInformer.Informer().HasSynced
  514. return jc, nil
  515. }