You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

common.go 8.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package globalmanager
  14. import (
  15. "context"
  16. "fmt"
  17. "math"
  18. "strings"
  19. "time"
  20. v1 "k8s.io/api/core/v1"
  21. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  22. "k8s.io/apimachinery/pkg/labels"
  23. "k8s.io/apimachinery/pkg/util/intstr"
  24. "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/util/workqueue"
  26. "k8s.io/klog/v2"
  27. k8scontroller "k8s.io/kubernetes/pkg/controller"
  28. )
  29. const (
  30. // DefaultBackOff is the default backoff period
  31. DefaultBackOff = 10 * time.Second
  32. // MaxBackOff is the max backoff period
  33. MaxBackOff = 360 * time.Second
  34. statusUpdateRetries = 3
  35. // setting some prefix for container path, include data and code prefix
  36. dataPrefix = "/home/data"
  37. bigModelPort int32 = 5000
  38. )
  39. // CreateVolumeMap creates volumeMap for container and
  40. // returns volumeMounts and volumes for stage of creating pod
  41. func CreateVolumeMap(workerPara *WorkerPara) ([]v1.VolumeMount, []v1.Volume) {
  42. var volumeMounts []v1.VolumeMount
  43. var volumes []v1.Volume
  44. volumetype := v1.HostPathDirectory
  45. mountPathMap := make(map[string]bool)
  46. duplicateIdx := make(map[int]bool)
  47. for i, v := range workerPara.volumeMountList {
  48. if mountPathMap[v] {
  49. duplicateIdx[i] = true
  50. continue
  51. }
  52. mountPathMap[v] = true
  53. tempVolumeMount := v1.VolumeMount{
  54. MountPath: v,
  55. Name: workerPara.volumeMapName[i],
  56. }
  57. volumeMounts = append(volumeMounts, tempVolumeMount)
  58. }
  59. for i, v := range workerPara.volumeList {
  60. if duplicateIdx[i] {
  61. continue
  62. }
  63. tempVolume := v1.Volume{
  64. Name: workerPara.volumeMapName[i],
  65. VolumeSource: v1.VolumeSource{
  66. HostPath: &v1.HostPathVolumeSource{
  67. Path: v,
  68. Type: &volumetype,
  69. },
  70. },
  71. }
  72. volumes = append(volumes, tempVolume)
  73. }
  74. return volumeMounts, volumes
  75. }
  76. // CreateEnvVars creates EnvMap for container
  77. // include EnvName and EnvValue map for stage of creating a pod
  78. func CreateEnvVars(envMap map[string]string) []v1.EnvVar {
  79. var envVars []v1.EnvVar
  80. for envName, envValue := range envMap {
  81. Env := v1.EnvVar{
  82. Name: envName,
  83. Value: envValue,
  84. }
  85. envVars = append(envVars, Env)
  86. }
  87. return envVars
  88. }
  89. // GetNodeIPByName get node ip by node name
  90. func GetNodeIPByName(kubeClient kubernetes.Interface, name string) (string, error) {
  91. n, err := kubeClient.CoreV1().Nodes().Get(context.Background(), name, metav1.GetOptions{})
  92. if err != nil {
  93. return "", err
  94. }
  95. typeToAddress := make(map[v1.NodeAddressType]string)
  96. for _, addr := range n.Status.Addresses {
  97. typeToAddress[addr.Type] = addr.Address
  98. }
  99. address, found := typeToAddress[v1.NodeExternalIP]
  100. if found {
  101. return address, nil
  102. }
  103. address, found = typeToAddress[v1.NodeInternalIP]
  104. if found {
  105. return address, nil
  106. }
  107. return "", fmt.Errorf("can't found node ip for node %s", name)
  108. }
  109. // GenerateLabels generates labels for an object
  110. func GenerateLabels(object CommonInterface) map[string]string {
  111. kind := object.GroupVersionKind().Kind
  112. group := object.GroupVersionKind().Group
  113. name := object.GetName()
  114. key := strings.ToLower(kind) + "." + group + "/name"
  115. labels := make(map[string]string)
  116. labels[key] = name
  117. return labels
  118. }
  119. // GenerateSelector generates selector for an object
  120. func GenerateSelector(object CommonInterface) (labels.Selector, error) {
  121. ls := &metav1.LabelSelector{
  122. MatchLabels: GenerateLabels(object),
  123. }
  124. return metav1.LabelSelectorAsSelector(ls)
  125. }
  126. // CreateKubernetesService creates a k8s service for an object given ip and port
  127. func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, inputPort int32, inputIP string) (int32, error) {
  128. ctx := context.Background()
  129. name := object.GetName()
  130. namespace := object.GetNamespace()
  131. kind := object.GroupVersionKind().Kind
  132. targePort := intstr.IntOrString{
  133. IntVal: inputPort,
  134. }
  135. serviceSpec := &v1.Service{
  136. ObjectMeta: metav1.ObjectMeta{
  137. Namespace: object.GetNamespace(),
  138. GenerateName: name + "-" + "service" + "-",
  139. OwnerReferences: []metav1.OwnerReference{
  140. *metav1.NewControllerRef(object, object.GroupVersionKind()),
  141. },
  142. Labels: GenerateLabels(object),
  143. },
  144. Spec: v1.ServiceSpec{
  145. Selector: GenerateLabels(object),
  146. ExternalIPs: []string{
  147. inputIP,
  148. },
  149. Type: v1.ServiceTypeNodePort,
  150. Ports: []v1.ServicePort{
  151. {
  152. Port: inputPort,
  153. TargetPort: targePort,
  154. },
  155. },
  156. },
  157. }
  158. service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{})
  159. if err != nil {
  160. klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err)
  161. return 0, err
  162. }
  163. klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name)
  164. return service.Spec.Ports[0].NodePort, nil
  165. }
  166. // getBackoff calc the next wait time for the key
  167. func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
  168. exp := queue.NumRequeues(key)
  169. if exp <= 0 {
  170. return time.Duration(0)
  171. }
  172. // The backoff is capped such that 'calculated' value never overflows.
  173. backoff := float64(DefaultBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
  174. if backoff > math.MaxInt64 {
  175. return MaxBackOff
  176. }
  177. calculated := time.Duration(backoff)
  178. if calculated > MaxBackOff {
  179. return MaxBackOff
  180. }
  181. return calculated
  182. }
  183. func calcActivePodCount(pods []*v1.Pod) int32 {
  184. var result int32 = 0
  185. for _, p := range pods {
  186. if v1.PodSucceeded != p.Status.Phase &&
  187. v1.PodFailed != p.Status.Phase &&
  188. p.DeletionTimestamp == nil {
  189. result++
  190. }
  191. }
  192. return result
  193. }
  194. // injectWorkerPara modifies pod in-place
  195. func injectWorkerPara(pod *v1.Pod, workerPara *WorkerPara, object CommonInterface) {
  196. // inject our predefined volumes/envs
  197. volumeMounts, volumes := CreateVolumeMap(workerPara)
  198. envs := CreateEnvVars(workerPara.env)
  199. pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
  200. for idx := range pod.Spec.Containers {
  201. pod.Spec.Containers[idx].Env = append(
  202. pod.Spec.Containers[idx].Env, envs...,
  203. )
  204. pod.Spec.Containers[idx].VolumeMounts = append(
  205. pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
  206. )
  207. }
  208. // inject our labels
  209. if pod.Labels == nil {
  210. pod.Labels = make(map[string]string)
  211. }
  212. for k, v := range GenerateLabels(object) {
  213. pod.Labels[k] = v
  214. }
  215. pod.GenerateName = object.GetName() + "-" + strings.ToLower(workerPara.workerType) + "-"
  216. pod.Namespace = object.GetNamespace()
  217. if workerPara.hostNetwork {
  218. // FIXME
  219. // force to set hostnetwork
  220. pod.Spec.HostNetwork = true
  221. }
  222. if pod.Spec.RestartPolicy == "" {
  223. pod.Spec.RestartPolicy = workerPara.restartPolicy
  224. }
  225. }
  226. // createPodWithTemplate creates and returns a pod object given a crd object, pod template, and workerPara
  227. func createPodWithTemplate(client kubernetes.Interface, object CommonInterface, spec *v1.PodTemplateSpec, workerPara *WorkerPara) (*v1.Pod, error) {
  228. objectKind := object.GroupVersionKind()
  229. pod, _ := k8scontroller.GetPodFromTemplate(spec, object, metav1.NewControllerRef(object, objectKind))
  230. injectWorkerPara(pod, workerPara, object)
  231. createdPod, err := client.CoreV1().Pods(object.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
  232. objectName := object.GetNamespace() + "/" + object.GetName()
  233. if err != nil {
  234. klog.Warningf("failed to create pod(type=%s) for %s %s, err:%s", workerPara.workerType, objectKind, objectName, err)
  235. return nil, err
  236. }
  237. klog.V(2).Infof("pod %s is created successfully for %s %s", createdPod.Name, objectKind, objectName)
  238. return createdPod, nil
  239. }