@@ -21,6 +21,7 @@ import (
"path/filepath"
"strings"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
@@ -171,58 +172,60 @@ func (m *MountURL) parseSecret() {
}
func injectHostPathMount(pod *v1.Pod, workerParam *WorkerParam) {
var volumes []v1.Volume
var volumeMounts []v1.VolumeMount
var initContainerVolumeMounts []v1.VolumeMount
volumes, volumeMounts, initContainerVolumeMounts := PrepareHostPath(workerParam)
uniqVolumeName := make(map[string]bool)
uniqMountPath := make(map[string]bool)
injectVolume(pod, volumes, volumeMounts)
hostPathType := v1.HostPathDirectory
if len(volumeMounts) > 0 {
hostPathEnvs := []v1.EnvVar{
{
Name: hostPathPrefixEnvKey,
Value: hostPathPrefix,
},
}
injectEnvs(pod, hostPathEnvs)
}
if len(initContainerVolumeMounts) > 0 {
initIdx := len(pod.Spec.InitContainers) - 1
pod.Spec.InitContainers[initIdx].VolumeMounts = append(
pod.Spec.InitContainers[initIdx].VolumeMounts,
initContainerVolumeMounts...,
)
}
}
func injectWorkerSecrets(pod *v1.Pod, workerParam *WorkerParam) {
var secretEnvs []v1.EnvVar
for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
if m.HostPath == "" {
if m.Disable || m.DownloadByInitializer {
continue
}
volumeName := ConvertK8SValidName(m.HostPath)
if len(volumeName) == 0 {
volumeName = defaultVolumeName
klog.Warningf("failed to get name from url(%s), fallback to default name(%s)", m.URL, volumeName)
}
if _, ok := uniqVolumeName[volumeName]; !ok {
volumes = append(volumes, v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: m.HostPath,
Type: &hostPathType,
},
},
})
uniqVolumeName[volumeName] = true
if len(m.SecretEnvs) > 0 {
secretEnvs = MergeSecretEnvs(secretEnvs, m.SecretEnvs, false)
}
}
}
injectEnvs(pod, secretEnvs)
}
if _, ok := uniqMountPath[m.MountPath]; !ok {
vm := v1.VolumeMount{
MountPath: m.MountPath,
Name: volumeName,
}
func injectInitializerContainer(pod *v1.Pod, workerParam *WorkerParam) {
volumes, volumeMounts, initContainer := PrepareInitContainer(workerParam)
if m.Indirect {
initContainerVolumeMounts = append(initContainerVolumeMounts, vm)
} else {
volumeMounts = append(volumeMounts, vm)
}
uniqMountPath[m.MountPath] = true
}
}
if (len(volumes) > 0) && (len(volumeMounts) > 0) && &initContainer != nil {
pod.Spec.InitContainers = append(pod.Spec.InitContainers, *initContainer)
injectVolume(pod, volumes, volumeMounts)
}
}
injectVolume(pod, volumes, volumeMounts)
/*
Deployment Storage Hooks
*/
func injectHostPathMountDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
volumes, volumeMounts, initContainerVolumeMounts := PrepareHostPath(workerParam)
injectVolumeDeployment(deployment, volumes, volumeMounts)
if len(volumeMounts) > 0 {
hostPathEnvs := []v1.EnvVar{
@@ -231,19 +234,147 @@ func injectHostPathMount(pod *v1.Pod, workerParam *WorkerParam) {
Value: hostPathPrefix,
},
}
injectEnvs(pod , hostPathEnvs)
injectEnvsDeployment(deployment , hostPathEnvs)
}
if len(initContainerVolumeMounts) > 0 {
initIdx := len(pod .Spec.InitContainers) - 1
pod .Spec.InitContainers[initIdx].VolumeMounts = append(
pod .Spec.InitContainers[initIdx].VolumeMounts,
initIdx := len(deployment.Spec.Template .Spec.InitContainers) - 1
deployment.Spec.Template .Spec.InitContainers[initIdx].VolumeMounts = append(
deployment.Spec.Template .Spec.InitContainers[initIdx].VolumeMounts,
initContainerVolumeMounts...,
)
}
}
func injectWorkerSecrets(pod *v1.Pod, workerParam *WorkerParam) {
func injectWorkerSecretsDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
secretEnvs := PrepareSecret(workerParam)
injectEnvsDeployment(deployment, secretEnvs)
}
func injectInitializerContainerDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
volumes, volumeMounts, initContainer := PrepareInitContainer(workerParam)
if (len(volumes) > 0) && (len(volumeMounts) > 0) && &initContainer != nil {
deployment.Spec.Template.Spec.InitContainers = append(deployment.Spec.Template.Spec.InitContainers, *initContainer)
injectVolumeDeployment(deployment, volumes, volumeMounts)
}
}
// InjectStorageInitializer injects these storage related volumes and envs into deployment in-place
func InjectStorageInitializerDeployment(deployment *appsv1.Deployment, workerParam *WorkerParam) {
PrepareStorage(workerParam)
// need to call injectInitializerContainer before injectHostPathMount
// since injectHostPathMount could inject volumeMount to init container
injectInitializerContainerDeployment(deployment, workerParam)
injectHostPathMountDeployment(deployment, workerParam)
injectWorkerSecretsDeployment(deployment, workerParam)
}
func injectVolumeDeployment(deployment *appsv1.Deployment, volumes []v1.Volume, volumeMounts []v1.VolumeMount) {
if len(volumes) > 0 {
deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, volumes...)
}
if len(volumeMounts) > 0 {
for idx := range deployment.Spec.Template.Spec.Containers {
// inject every containers
deployment.Spec.Template.Spec.Containers[idx].VolumeMounts = append(
deployment.Spec.Template.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}
}
}
func injectEnvsDeployment(deployment *appsv1.Deployment, envs []v1.EnvVar) {
if len(envs) > 0 {
for idx := range deployment.Spec.Template.Spec.Containers {
// inject every containers
deployment.Spec.Template.Spec.Containers[idx].Env = append(
deployment.Spec.Template.Spec.Containers[idx].Env, envs...,
)
}
}
}
// InjectStorageInitializer injects these storage related volumes and envs into pod in-place
func InjectStorageInitializer(pod *v1.Pod, workerParam *WorkerParam) {
PrepareStorage(workerParam)
// need to call injectInitializerContainer before injectHostPathMount
// since injectHostPathMount could inject volumeMount to init container
injectInitializerContainer(pod, workerParam)
injectHostPathMount(pod, workerParam)
injectWorkerSecrets(pod, workerParam)
}
func injectVolume(pod *v1.Pod, volumes []v1.Volume, volumeMounts []v1.VolumeMount) {
if len(volumes) > 0 {
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
}
if len(volumeMounts) > 0 {
for idx := range pod.Spec.Containers {
// inject every containers
pod.Spec.Containers[idx].VolumeMounts = append(
pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}
}
}
func injectEnvs(pod *v1.Pod, envs []v1.EnvVar) {
if len(envs) > 0 {
for idx := range pod.Spec.Containers {
// inject every containers
pod.Spec.Containers[idx].Env = append(
pod.Spec.Containers[idx].Env, envs...,
)
}
}
}
func PrepareStorage(workerParam *WorkerParam) {
var mounts []WorkerMount
// parse the mounts and environment key
for _, mount := range workerParam.Mounts {
var envPaths []string
if mount.URL != nil {
mount.URLs = append(mount.URLs, *mount.URL)
}
var mountURLs []MountURL
for _, m := range mount.URLs {
m.Parse()
if m.Disable {
continue
}
mountURLs = append(mountURLs, m)
if m.ContainerPath != "" {
envPaths = append(envPaths, m.ContainerPath)
} else {
// keep the original URL if no container path
envPaths = append(envPaths, m.URL)
}
}
if len(mountURLs) > 0 {
mount.URLs = mountURLs
mounts = append(mounts, mount)
}
if mount.EnvName != "" {
workerParam.Env[mount.EnvName] = strings.Join(
envPaths, urlsFieldSep,
)
}
}
workerParam.Mounts = mounts
}
func PrepareSecret(workerParam *WorkerParam) []v1.EnvVar {
var secretEnvs []v1.EnvVar
for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
@@ -255,10 +386,59 @@ func injectWorkerSecrets(pod *v1.Pod, workerParam *WorkerParam) {
}
}
}
injectEnvs(pod, secretEnvs)
return secretEnvs
}
func injectInitializerContainer(pod *v1.Pod, workerParam *WorkerParam) {
func PrepareHostPath(workerParam *WorkerParam) ([]v1.Volume, []v1.VolumeMount, []v1.VolumeMount) {
var volumes []v1.Volume
var volumeMounts []v1.VolumeMount
var initContainerVolumeMounts []v1.VolumeMount
uniqVolumeName := make(map[string]bool)
hostPathType := v1.HostPathDirectory
for _, mount := range workerParam.Mounts {
for _, m := range mount.URLs {
if m.HostPath == "" {
continue
}
volumeName := ConvertK8SValidName(m.HostPath)
if len(volumeName) == 0 {
volumeName = defaultVolumeName
klog.Warningf("failed to get name from url(%s), fallback to default name(%s)", m.URL, volumeName)
}
if _, ok := uniqVolumeName[volumeName]; !ok {
volumes = append(volumes, v1.Volume{
Name: volumeName,
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: m.HostPath,
Type: &hostPathType,
},
},
})
uniqVolumeName[volumeName] = true
}
vm := v1.VolumeMount{
MountPath: m.MountPath,
Name: volumeName,
}
if m.Indirect {
initContainerVolumeMounts = append(initContainerVolumeMounts, vm)
} else {
volumeMounts = append(volumeMounts, vm)
}
}
}
return volumes, volumeMounts, initContainerVolumeMounts
}
func PrepareInitContainer(workerParam *WorkerParam) ([]v1.Volume, []v1.VolumeMount, *v1.Container) {
var volumes []v1.Volume
var volumeMounts []v1.VolumeMount
@@ -289,7 +469,7 @@ func injectInitializerContainer(pod *v1.Pod, workerParam *WorkerParam) {
// no need to download
if len(downloadPairs) == 0 {
return
return nil, nil, nil
}
envs := secretEnvs
@@ -343,80 +523,5 @@ func injectInitializerContainer(pod *v1.Pod, workerParam *WorkerParam) {
Env: envs,
}
pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer)
injectVolume(pod, volumes, volumeMounts)
}
// InjectStorageInitializer injects these storage related volumes and envs into pod in-place
func InjectStorageInitializer(pod *v1.Pod, workerParam *WorkerParam) {
var mounts []WorkerMount
// parse the mounts and environment key
for _, mount := range workerParam.Mounts {
var envPaths []string
if mount.URL != nil {
mount.URLs = append(mount.URLs, *mount.URL)
}
var mountURLs []MountURL
for _, m := range mount.URLs {
m.Parse()
if m.Disable {
continue
}
mountURLs = append(mountURLs, m)
if m.ContainerPath != "" {
envPaths = append(envPaths, m.ContainerPath)
} else {
// keep the original URL if no container path
envPaths = append(envPaths, m.URL)
}
}
if len(mountURLs) > 0 {
mount.URLs = mountURLs
mounts = append(mounts, mount)
}
if mount.EnvName != "" {
workerParam.Env[mount.EnvName] = strings.Join(
envPaths, urlsFieldSep,
)
}
}
workerParam.Mounts = mounts
// need to call injectInitializerContainer before injectHostPathMount
// since injectHostPathMount could inject volumeMount to init container
injectInitializerContainer(pod, workerParam)
injectHostPathMount(pod, workerParam)
injectWorkerSecrets(pod, workerParam)
}
func injectVolume(pod *v1.Pod, volumes []v1.Volume, volumeMounts []v1.VolumeMount) {
if len(volumes) > 0 {
pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
}
if len(volumeMounts) > 0 {
for idx := range pod.Spec.Containers {
// inject every containers
pod.Spec.Containers[idx].VolumeMounts = append(
pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
)
}
}
}
func injectEnvs(pod *v1.Pod, envs []v1.EnvVar) {
if len(envs) > 0 {
for idx := range pod.Spec.Containers {
// inject every containers
pod.Spec.Containers[idx].Env = append(
pod.Spec.Containers[idx].Env, envs...,
)
}
}
return volumes, volumeMounts, &initContainer
}