You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

worker.go 12 kB


  1. package runtime
  2. import (
  3. "context"
  4. "fmt"
  5. "path/filepath"
  6. "strconv"
  7. "strings"
  8. appsv1 "k8s.io/api/apps/v1"
  9. v1 "k8s.io/api/core/v1"
  10. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  11. "k8s.io/apimachinery/pkg/labels"
  12. "k8s.io/apimachinery/pkg/util/intstr"
  13. "k8s.io/client-go/kubernetes"
  14. "k8s.io/klog/v2"
  15. k8scontroller "k8s.io/kubernetes/pkg/controller"
  16. )
  17. type WorkerMount struct {
  18. Name string
  19. // the url to be mounted
  20. URL *MountURL
  21. // for some cases, there are more than one url to be mounted
  22. URLs []MountURL
  23. // envName indicates the environment key of the mounts injected to the worker
  24. EnvName string
  25. }
  26. // WorkerParam describes the system-defined parameters of worker
  27. type WorkerParam struct {
  28. Mounts []WorkerMount
  29. Env map[string]string
  30. WorkerType string
  31. // if true, force to use hostNetwork
  32. HostNetwork bool
  33. ModelHotUpdate ModelHotUpdate
  34. RestartPolicy v1.RestartPolicy
  35. DNSPolicy v1.DNSPolicy
  36. }
  37. type ModelHotUpdate struct {
  38. Enable bool
  39. PollPeriodSeconds int64
  40. }
  41. // generateLabels generates labels for an object
  42. func generateLabels(object CommonInterface, workerType string) map[string]string {
  43. kind := object.GroupVersionKind().Kind
  44. group := object.GroupVersionKind().Group
  45. keyPrefix := strings.ToLower(kind + "." + group + "/")
  46. labels := make(map[string]string)
  47. labels[keyPrefix+"name"] = object.GetName()
  48. labels[keyPrefix+"uid"] = string(object.GetUID())
  49. if workerType != "" {
  50. labels[keyPrefix+"worker-type"] = strings.ToLower(workerType)
  51. }
  52. return labels
  53. }
  54. // GenerateSelector generates the selector of an object for worker
  55. func GenerateSelector(object CommonInterface) (labels.Selector, error) {
  56. ls := &metav1.LabelSelector{
  57. // select any type workers
  58. MatchLabels: generateLabels(object, ""),
  59. }
  60. return metav1.LabelSelectorAsSelector(ls)
  61. }
  62. // GenerateWorkerSelector generates the selector of an object for specific worker type
  63. func GenerateWorkerSelector(object CommonInterface, workerType string) (labels.Selector, error) {
  64. ls := &metav1.LabelSelector{
  65. // select any type workers
  66. MatchLabels: generateLabels(object, workerType),
  67. }
  68. return metav1.LabelSelectorAsSelector(ls)
  69. }
  70. // CreateKubernetesService creates a k8s service for an object given ip and port
  71. func CreateKubernetesService(kubeClient kubernetes.Interface, object CommonInterface, workerType string, inputPort int32, inputIP string) (int32, error) {
  72. ctx := context.Background()
  73. name := object.GetName()
  74. namespace := object.GetNamespace()
  75. kind := object.GroupVersionKind().Kind
  76. targePort := intstr.IntOrString{
  77. IntVal: inputPort,
  78. }
  79. serviceSpec := &v1.Service{
  80. ObjectMeta: metav1.ObjectMeta{
  81. Namespace: object.GetNamespace(),
  82. Name: strings.ToLower(name + "-" + workerType),
  83. OwnerReferences: []metav1.OwnerReference{
  84. *metav1.NewControllerRef(object, object.GroupVersionKind()),
  85. },
  86. Labels: generateLabels(object, workerType),
  87. },
  88. Spec: v1.ServiceSpec{
  89. Selector: generateLabels(object, workerType),
  90. ExternalIPs: []string{
  91. inputIP,
  92. },
  93. Type: v1.ServiceTypeNodePort,
  94. Ports: []v1.ServicePort{
  95. {
  96. Port: inputPort,
  97. TargetPort: targePort,
  98. },
  99. },
  100. },
  101. }
  102. service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{})
  103. if err != nil {
  104. klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err)
  105. return 0, err
  106. }
  107. klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name)
  108. return service.Spec.Ports[0].NodePort, nil
  109. }
  110. // injectWorkerParam modifies pod in-place
  111. func injectWorkerParam(pod *v1.Pod, workerParam *WorkerParam, object CommonInterface) {
  112. InjectStorageInitializer(pod, workerParam)
  113. if workerParam.WorkerType == InferencePodType && workerParam.ModelHotUpdate.Enable {
  114. injectModelHotUpdateMount(pod, object)
  115. setModelHotUpdateEnv(workerParam)
  116. }
  117. envs := createEnvVars(workerParam.Env)
  118. for idx := range pod.Spec.Containers {
  119. pod.Spec.Containers[idx].Env = append(
  120. pod.Spec.Containers[idx].Env, envs...,
  121. )
  122. }
  123. // inject our labels
  124. if pod.Labels == nil {
  125. pod.Labels = make(map[string]string)
  126. }
  127. for k, v := range generateLabels(object, workerParam.WorkerType) {
  128. pod.Labels[k] = v
  129. }
  130. pod.GenerateName = object.GetName() + "-" + strings.ToLower(workerParam.WorkerType) + "-"
  131. pod.Namespace = object.GetNamespace()
  132. if workerParam.HostNetwork {
  133. // FIXME
  134. // force to set hostnetwork
  135. pod.Spec.HostNetwork = true
  136. }
  137. if pod.Spec.RestartPolicy == "" {
  138. pod.Spec.RestartPolicy = workerParam.RestartPolicy
  139. }
  140. if workerParam.DNSPolicy != "" {
  141. pod.Spec.DNSPolicy = workerParam.DNSPolicy
  142. }
  143. }
  144. // CreatePodWithTemplate creates and returns a pod object given a crd object, pod template, and workerParam
  145. func CreatePodWithTemplate(client kubernetes.Interface, object CommonInterface, spec *v1.PodTemplateSpec, workerParam *WorkerParam) (*v1.Pod, error) {
  146. objectKind := object.GroupVersionKind()
  147. pod, _ := k8scontroller.GetPodFromTemplate(spec, object, metav1.NewControllerRef(object, objectKind))
  148. injectWorkerParam(pod, workerParam, object)
  149. createdPod, err := client.CoreV1().Pods(object.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
  150. objectName := object.GetNamespace() + "/" + object.GetName()
  151. if err != nil {
  152. klog.Warningf("failed to create pod(type=%s) for %s %s, err:%s", workerParam.WorkerType, objectKind, objectName, err)
  153. return nil, err
  154. }
  155. klog.V(2).Infof("pod %s is created successfully for %s %s", createdPod.Name, objectKind, objectName)
  156. return createdPod, nil
  157. }
  158. // CreateEdgeMeshService creates a kubeedge edgemesh service for an object, and returns an edgemesh service URL.
  159. // Since edgemesh can realize Cross-Edge-Cloud communication, the service can be created both on the cloud or edge side.
  160. func CreateEdgeMeshService(kubeClient kubernetes.Interface, object CommonInterface, workerType string, servicePort int32) (string, error) {
  161. ctx := context.Background()
  162. name := object.GetName()
  163. namespace := object.GetNamespace()
  164. kind := object.GroupVersionKind().Kind
  165. targetPort := intstr.IntOrString{
  166. IntVal: servicePort,
  167. }
  168. serviceSpec := &v1.Service{
  169. ObjectMeta: metav1.ObjectMeta{
  170. Namespace: namespace,
  171. Name: strings.ToLower(name + "-" + workerType),
  172. OwnerReferences: []metav1.OwnerReference{
  173. *metav1.NewControllerRef(object, object.GroupVersionKind()),
  174. },
  175. Labels: generateLabels(object, workerType),
  176. },
  177. Spec: v1.ServiceSpec{
  178. Selector: generateLabels(object, workerType),
  179. Ports: []v1.ServicePort{
  180. {
  181. // TODO: be clean, Port.Name is currently required by edgemesh(v1.8.0).
  182. // and should be <protocol>-<suffix>
  183. Name: "tcp-0",
  184. Protocol: "TCP",
  185. Port: servicePort,
  186. TargetPort: targetPort,
  187. },
  188. },
  189. },
  190. }
  191. service, err := kubeClient.CoreV1().Services(namespace).Create(ctx, serviceSpec, metav1.CreateOptions{})
  192. if err != nil {
  193. klog.Warningf("failed to create service for %v %v/%v, err:%s", kind, namespace, name, err)
  194. return "", err
  195. }
  196. klog.V(2).Infof("Service %s is created successfully for %v %v/%v", service.Name, kind, namespace, name)
  197. return fmt.Sprintf("%s.%s", service.Name, service.Namespace), nil
  198. }
  199. // CreateDeploymentWithTemplate creates and returns a deployment object given a crd object, deployment template
  200. func CreateDeploymentWithTemplate(client kubernetes.Interface, object CommonInterface, spec *appsv1.DeploymentSpec, workerParam *WorkerParam, port int32) (*appsv1.Deployment, error) {
  201. objectKind := object.GroupVersionKind()
  202. objectName := object.GetNamespace() + "/" + object.GetName()
  203. deployment := newDeployment(object, spec, workerParam)
  204. injectDeploymentParam(deployment, workerParam, object, port)
  205. createdDeployment, err := client.AppsV1().Deployments(object.GetNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{})
  206. if err != nil {
  207. klog.Warningf("failed to create deployment for %s %s, err:%s", objectKind, objectName, err)
  208. return nil, err
  209. }
  210. klog.V(2).Infof("deployment %s is created successfully for %s %s", createdDeployment.Name, objectKind, objectName)
  211. return createdDeployment, nil
  212. }
  213. func newDeployment(object CommonInterface, spec *appsv1.DeploymentSpec, workerParam *WorkerParam) *appsv1.Deployment {
  214. nameSpace := object.GetNamespace()
  215. deploymentName := object.GetName() + "-" + "deployment" + "-" + strings.ToLower(workerParam.WorkerType) + "-"
  216. matchLabel := make(map[string]string)
  217. return &appsv1.Deployment{
  218. ObjectMeta: metav1.ObjectMeta{
  219. GenerateName: deploymentName,
  220. Namespace: nameSpace,
  221. OwnerReferences: []metav1.OwnerReference{
  222. *metav1.NewControllerRef(object, object.GroupVersionKind()),
  223. },
  224. },
  225. Spec: appsv1.DeploymentSpec{
  226. Replicas: (*spec).Replicas,
  227. Template: (*spec).Template,
  228. Selector: &metav1.LabelSelector{
  229. MatchLabels: matchLabel,
  230. },
  231. },
  232. }
  233. }
  234. // injectDeploymentParam modifies deployment in-place
  235. func injectDeploymentParam(deployment *appsv1.Deployment, workerParam *WorkerParam, object CommonInterface, _port int32) {
  236. var appLabelKey = "app.sedna.io"
  237. var appLabelValue = object.GetName() + "-" + workerParam.WorkerType + "-" + "svc"
  238. // Injection of the storage variables must be done before loading
  239. // the environment variables!
  240. if workerParam.Mounts != nil {
  241. InjectStorageInitializerDeployment(deployment, workerParam)
  242. }
  243. // inject our labels
  244. if deployment.Labels == nil {
  245. deployment.Labels = make(map[string]string)
  246. }
  247. if deployment.Spec.Template.Labels == nil {
  248. deployment.Spec.Template.Labels = make(map[string]string)
  249. }
  250. if deployment.Spec.Selector.MatchLabels == nil {
  251. deployment.Spec.Selector.MatchLabels = make(map[string]string)
  252. }
  253. for k, v := range generateLabels(object, workerParam.WorkerType) {
  254. deployment.Labels[k] = v
  255. deployment.Spec.Template.Labels[k] = v
  256. deployment.Spec.Selector.MatchLabels[k] = v
  257. }
  258. // Edgemesh part, useful for service mapping (not necessary!)
  259. deployment.Labels[appLabelKey] = appLabelValue
  260. deployment.Spec.Template.Labels[appLabelKey] = appLabelValue
  261. deployment.Spec.Selector.MatchLabels[appLabelKey] = appLabelValue
  262. // Env variables injection
  263. envs := createEnvVars(workerParam.Env)
  264. for idx := range deployment.Spec.Template.Spec.Containers {
  265. deployment.Spec.Template.Spec.Containers[idx].Env = append(
  266. deployment.Spec.Template.Spec.Containers[idx].Env, envs...,
  267. )
  268. }
  269. }
  270. // createEnvVars creates EnvMap for container
  271. // include EnvName and EnvValue map for stage of creating a pod
  272. func createEnvVars(envMap map[string]string) []v1.EnvVar {
  273. var envVars []v1.EnvVar
  274. for envName, envValue := range envMap {
  275. Env := v1.EnvVar{
  276. Name: envName,
  277. Value: envValue,
  278. }
  279. envVars = append(envVars, Env)
  280. }
  281. return envVars
  282. }
  283. // injectModelHotUpdateMount injects volume mounts when worker supports hot update of model
  284. func injectModelHotUpdateMount(pod *v1.Pod, object CommonInterface) {
  285. hostPathType := v1.HostPathDirectoryOrCreate
  286. var volumes []v1.Volume
  287. var volumeMounts []v1.VolumeMount
  288. modelHotUpdateHostDir, _ := filepath.Split(GetModelHotUpdateConfigFile(object, ModelHotUpdateHostPrefix))
  289. volumeName := ConvertK8SValidName(ModelHotUpdateVolumeName)
  290. volumes = append(volumes, v1.Volume{
  291. Name: volumeName,
  292. VolumeSource: v1.VolumeSource{
  293. HostPath: &v1.HostPathVolumeSource{
  294. Path: modelHotUpdateHostDir,
  295. Type: &hostPathType,
  296. },
  297. },
  298. })
  299. volumeMounts = append(volumeMounts, v1.VolumeMount{
  300. MountPath: ModelHotUpdateContainerPrefix,
  301. Name: volumeName,
  302. })
  303. injectVolume(pod, volumes, volumeMounts)
  304. }
  305. func GetModelHotUpdateConfigFile(object CommonInterface, prefix string) string {
  306. return strings.ToLower(filepath.Join(prefix, object.GetNamespace(), object.GetObjectKind().GroupVersionKind().Kind,
  307. object.GetName(), ModelHotUpdateConfigFile))
  308. }
  309. // setModelHotUpdateEnv sets envs of model hot update
  310. func setModelHotUpdateEnv(workerParam *WorkerParam) {
  311. workerParam.Env["MODEL_HOT_UPDATE"] = "true"
  312. workerParam.Env["MODEL_POLL_PERIOD_SECONDS"] = strconv.FormatInt(workerParam.ModelHotUpdate.PollPeriodSeconds, 10)
  313. workerParam.Env["MODEL_HOT_UPDATE_CONFIG"] = filepath.Join(ModelHotUpdateContainerPrefix, ModelHotUpdateConfigFile)
  314. }