You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

storage_initializer_injector.go 7.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. /*
  2. Copyright 2021 The KubeEdge Authors.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package globalmanager
  14. import (
  15. "net/url"
  16. "path/filepath"
  17. "strings"
  18. v1 "k8s.io/api/core/v1"
  19. "k8s.io/apimachinery/pkg/api/resource"
  20. "k8s.io/klog/v2"
  21. )
  22. const (
  23. downloadInitalizerContainerName = "storage-initializer"
  24. downloadInitalizerImage = "kubeedge/sedna-storage-initializer:v0.1.0"
  25. downloadInitalizerPrefix = "/downloads"
  26. downloadInitalizerVolumeName = "sedna-storage-initializer"
  27. hostPathPrefix = "/hostpath"
  28. urlsFieldSep = ";"
  29. volumeNamePrefix = "sedna-"
  30. )
  31. var supportStorageInitializerURLSchemes = [...]string{
  32. // s3 compatible storage
  33. "s3",
  34. // http server, only for downloading
  35. "http", "https",
  36. }
  37. var supportURLSchemes = [...]string{
  38. // s3 compatbile storage
  39. "s3",
  40. // http server, only for downloading
  41. "http", "https",
  42. // hostpath of node, for compatibility only
  43. // "/opt/data/model.pb"
  44. "",
  45. // the local path of worker-container
  46. "file",
  47. }
  48. type workerMountMode string
  49. const (
  50. workerMountReadOnly workerMountMode = "readonly"
  51. workerMountWriteOnly workerMountMode = "writeonly"
  52. // no read-write support for mount url/directory now
  53. )
  54. type MountURL struct {
  55. // URL is the url of dataset/model
  56. URL string
  57. // Mode indicates the url mode, default is workerMountReadOnly
  58. Mode workerMountMode
  59. // IsDir indicates that url is directory
  60. IsDir bool
  61. // if true, only mounts when url is hostpath
  62. CheckHostPath bool
  63. // the container path
  64. ContainerPath string
  65. // indicates the path this url will be mounted into container.
  66. // can be containerPath or its parent dir
  67. MountPath string
  68. // for host path, we just need to mount without downloading
  69. HostPath string
  70. // for storage initializer
  71. DownloadSrcURL string
  72. DownloadDstDir string
  73. // if Disable, then no mount
  74. Disable bool
  75. // parsed for the parent of url
  76. u *url.URL
  77. }
  78. func (m *MountURL) Parse() {
  79. u, _ := url.Parse(m.URL)
  80. m.u = u
  81. m.injectDownloadPath()
  82. m.injectHostPath()
  83. }
  84. func (m *MountURL) injectDownloadPath() {
  85. if m.Mode == workerMountWriteOnly {
  86. // no storage-initializer for write only
  87. // leave the write operation to worker
  88. return
  89. }
  90. for _, scheme := range supportStorageInitializerURLSchemes {
  91. if m.u.Scheme == scheme {
  92. m.MountPath = downloadInitalizerPrefix
  93. // here use u.Host + u.Path to avoid conflict
  94. m.ContainerPath = filepath.Join(m.MountPath, m.u.Host+m.u.Path)
  95. m.DownloadSrcURL = m.URL
  96. m.DownloadDstDir, _ = filepath.Split(m.ContainerPath)
  97. break
  98. }
  99. }
  100. }
  101. func (m *MountURL) injectHostPath() {
  102. // for compatibility, hostpath of a node is supported.
  103. // e.g. the url of a dataset: /datasets/d1/label.txt
  104. if m.u.Scheme != "" {
  105. if m.CheckHostPath {
  106. m.Disable = true
  107. }
  108. return
  109. }
  110. if m.IsDir {
  111. m.HostPath = m.URL
  112. m.MountPath = filepath.Join(hostPathPrefix, m.u.Path)
  113. m.ContainerPath = m.MountPath
  114. } else {
  115. // if file, here mount its directory
  116. m.HostPath, _ = filepath.Split(m.URL)
  117. m.ContainerPath = filepath.Join(hostPathPrefix, m.u.Path)
  118. m.MountPath, _ = filepath.Split(m.ContainerPath)
  119. }
  120. }
  121. func injectHostPathMount(pod *v1.Pod, workerParam *WorkerParam) {
  122. var volumes []v1.Volume
  123. var volumeMounts []v1.VolumeMount
  124. uniqVolumeName := make(map[string]bool)
  125. hostPathType := v1.HostPathDirectory
  126. for _, mount := range workerParam.mounts {
  127. for _, m := range mount.URLs {
  128. if m.HostPath != "" {
  129. volumeName := ConvertK8SValidName(m.HostPath)
  130. if volumeName == "" {
  131. klog.Warningf("failed to convert volume name from the url and skipped: %s", m.URL)
  132. continue
  133. }
  134. if _, ok := uniqVolumeName[volumeName]; !ok {
  135. volumes = append(volumes, v1.Volume{
  136. Name: volumeName,
  137. VolumeSource: v1.VolumeSource{
  138. HostPath: &v1.HostPathVolumeSource{
  139. Path: m.HostPath,
  140. Type: &hostPathType,
  141. },
  142. },
  143. })
  144. uniqVolumeName[volumeName] = true
  145. }
  146. vm := v1.VolumeMount{
  147. MountPath: m.MountPath,
  148. Name: volumeName,
  149. }
  150. volumeMounts = append(volumeMounts, vm)
  151. }
  152. }
  153. }
  154. injectVolume(pod, volumes, volumeMounts)
  155. }
  156. func injectDownloadInitializer(pod *v1.Pod, workerParam *WorkerParam) {
  157. var volumes []v1.Volume
  158. var volumeMounts []v1.VolumeMount
  159. var downloadPairs []string
  160. for _, mount := range workerParam.mounts {
  161. for _, m := range mount.URLs {
  162. if m.DownloadSrcURL != "" && m.DownloadDstDir != "" {
  163. // srcURL dstDir
  164. // need to add srcURL first
  165. downloadPairs = append(downloadPairs, m.DownloadSrcURL, m.DownloadDstDir)
  166. }
  167. }
  168. }
  169. // no need to download
  170. if len(downloadPairs) == 0 {
  171. return
  172. }
  173. // use one empty directory
  174. storageVolume := v1.Volume{
  175. Name: downloadInitalizerVolumeName,
  176. VolumeSource: v1.VolumeSource{
  177. EmptyDir: &v1.EmptyDirVolumeSource{},
  178. },
  179. }
  180. storageVolumeMounts := v1.VolumeMount{
  181. Name: storageVolume.Name,
  182. MountPath: downloadInitalizerPrefix,
  183. ReadOnly: true,
  184. }
  185. volumes = append(volumes, storageVolume)
  186. volumeMounts = append(volumeMounts, storageVolumeMounts)
  187. initVolumeMounts := []v1.VolumeMount{
  188. {
  189. Name: storageVolume.Name,
  190. MountPath: downloadInitalizerPrefix,
  191. ReadOnly: false,
  192. },
  193. }
  194. initContainer := v1.Container{
  195. Name: downloadInitalizerContainerName,
  196. Image: downloadInitalizerImage,
  197. ImagePullPolicy: v1.PullIfNotPresent,
  198. Args: downloadPairs,
  199. TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
  200. Resources: v1.ResourceRequirements{
  201. Limits: map[v1.ResourceName]resource.Quantity{
  202. // limit one cpu
  203. v1.ResourceCPU: resource.MustParse("1"),
  204. // limit 1Gi memory
  205. v1.ResourceMemory: resource.MustParse("1Gi"),
  206. },
  207. },
  208. VolumeMounts: initVolumeMounts,
  209. }
  210. pod.Spec.InitContainers = append(pod.Spec.InitContainers, initContainer)
  211. injectVolume(pod, volumes, volumeMounts)
  212. }
  213. // InjectStorageInitializer injects these storage related volumes and envs into pod in-place
  214. func InjectStorageInitializer(pod *v1.Pod, workerParam *WorkerParam) {
  215. var mounts []WorkerMount
  216. // parse the mounts and environment key
  217. for _, mount := range workerParam.mounts {
  218. var envPaths []string
  219. if mount.URL != nil {
  220. mount.URLs = append(mount.URLs, *mount.URL)
  221. }
  222. var mountURLs []MountURL
  223. for _, m := range mount.URLs {
  224. m.Parse()
  225. if m.Disable {
  226. continue
  227. }
  228. mountURLs = append(mountURLs, m)
  229. envPaths = append(envPaths, m.ContainerPath)
  230. }
  231. if len(mountURLs) > 0 {
  232. mount.URLs = mountURLs
  233. mounts = append(mounts, mount)
  234. }
  235. if mount.EnvName != "" {
  236. workerParam.env[mount.EnvName] = strings.Join(
  237. envPaths, urlsFieldSep,
  238. )
  239. }
  240. }
  241. workerParam.mounts = mounts
  242. injectHostPathMount(pod, workerParam)
  243. injectDownloadInitializer(pod, workerParam)
  244. }
  245. func injectVolume(pod *v1.Pod, volumes []v1.Volume, volumeMounts []v1.VolumeMount) {
  246. if len(volumes) > 0 {
  247. pod.Spec.Volumes = append(pod.Spec.Volumes, volumes...)
  248. }
  249. if len(volumeMounts) > 0 {
  250. for idx := range pod.Spec.Containers {
  251. // inject every containers
  252. pod.Spec.Containers[idx].VolumeMounts = append(
  253. pod.Spec.Containers[idx].VolumeMounts, volumeMounts...,
  254. )
  255. }
  256. }
  257. }