-
Notifications
You must be signed in to change notification settings - Fork 118
Submission client redesign to use a step-based builder pattern #365
Changes from 1 commit
9ff8c69
c23bb4c
f8d28b8
90f77fb
01b8d18
db1f0c2
20d9a90
1fb49a0
11c95e9
80a186d
1f58411
31985a6
9e002aa
61a7561
fa78aad
c477a0c
5a76328
16adf71
ed52eee
397312c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -621,14 +621,17 @@ object SparkSubmit { | |
if (isKubernetesCluster) { | ||
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client" | ||
if (args.isPython) { | ||
childArgs += args.primaryResource | ||
childArgs += "org.apache.spark.deploy.PythonRunner" | ||
childArgs += args.pyFiles | ||
childArgs ++= Array("--primary-py-file", args.primaryResource) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if it makes sense for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't have many better options here because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In |
||
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner") | ||
childArgs ++= Array("--other-py-files", args.pyFiles) | ||
} else { | ||
childArgs += args.primaryResource | ||
childArgs += args.mainClass | ||
childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
childArgs ++= Array("--main-class", args.mainClass) | ||
} | ||
args.childArgs.foreach { arg => | ||
childArgs += "--arg" | ||
childArgs += arg | ||
} | ||
childArgs ++= args.childArgs | ||
} | ||
|
||
// Load any properties specified through --conf and the default properties file | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.kubernetes | ||
|
||
import io.fabric8.kubernetes.api.model.{Container, Pod} | ||
|
||
private[spark] case class PodWithDetachedInitContainer( | ||
pod: Pod, | ||
initContainer: Container, | ||
mainContainer: Container) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes | |
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder} | ||
|
||
import org.apache.spark.deploy.kubernetes.constants._ | ||
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil} | ||
|
||
/** | ||
* This is separated out from the init-container steps API because this component can be reused to | ||
* set up the init-container for executors as well. | ||
*/ | ||
private[spark] trait SparkPodInitContainerBootstrap { | ||
/** | ||
* Bootstraps an init-container that downloads dependencies to be used by a main container. | ||
* Note that this primarily assumes that the init-container's configuration is being provided | ||
* by a ConfigMap that was installed by some other component; that is, the implementation | ||
* here makes no assumptions about how the init-container is specifically configured. For | ||
* example, this class is unaware if the init-container is fetching remote dependencies or if | ||
* it is fetching dependencies from a resource staging server. | ||
* it is fetching dependencies from a resource staging server. Additionally, the container itself | ||
* is not actually attached to the pod, but the init container is returned so it can be attached | ||
* by InitContainerUtil after the caller has decided to make any changes to it. | ||
*/ | ||
def bootstrapInitContainerAndVolumes( | ||
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder | ||
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer) | ||
: PodWithDetachedInitContainer | ||
} | ||
|
||
private[spark] class SparkPodInitContainerBootstrapImpl( | ||
|
@@ -41,13 +47,12 @@ private[spark] class SparkPodInitContainerBootstrapImpl( | |
filesDownloadPath: String, | ||
downloadTimeoutMinutes: Long, | ||
initContainerConfigMapName: String, | ||
initContainerConfigMapKey: String, | ||
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin]) | ||
initContainerConfigMapKey: String) | ||
extends SparkPodInitContainerBootstrap { | ||
|
||
override def bootstrapInitContainerAndVolumes( | ||
mainContainerName: String, | ||
originalPodSpec: PodBuilder): PodBuilder = { | ||
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer) | ||
: PodWithDetachedInitContainer = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this syntax looks a bit weird -- should the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't fit on one line in under 100 characters. |
||
val sharedVolumeMounts = Seq[VolumeMount]( | ||
new VolumeMountBuilder() | ||
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME) | ||
|
@@ -58,7 +63,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( | |
.withMountPath(filesDownloadPath) | ||
.build()) | ||
|
||
val initContainer = new ContainerBuilder() | ||
val initContainer = new ContainerBuilder(originalPodWithUnattachedInitContainer.initContainer) | ||
.withName(s"spark-init") | ||
.withImage(initContainerImage) | ||
.withImagePullPolicy(dockerImagePullPolicy) | ||
|
@@ -68,11 +73,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl( | |
.endVolumeMount() | ||
.addToVolumeMounts(sharedVolumeMounts: _*) | ||
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH) | ||
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin => | ||
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer) | ||
}.getOrElse(initContainer).build() | ||
val podWithBasicVolumes = InitContainerUtil.appendInitContainer( | ||
originalPodSpec, resolvedInitContainer) | ||
.build() | ||
val podWithBasicVolumes = new PodBuilder(originalPodWithUnattachedInitContainer.pod) | ||
.editSpec() | ||
.addNewVolume() | ||
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) | ||
|
@@ -92,17 +94,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl( | |
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME) | ||
.withEmptyDir(new EmptyDirVolumeSource()) | ||
.endVolume() | ||
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) | ||
.addToVolumeMounts(sharedVolumeMounts: _*) | ||
.addNewEnv() | ||
.withName(ENV_MOUNTED_FILES_DIR) | ||
.withValue(filesDownloadPath) | ||
.endEnv() | ||
.endContainer() | ||
.endSpec() | ||
resourceStagingServerSecretPlugin.map { plugin => | ||
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes) | ||
}.getOrElse(podWithBasicVolumes) | ||
.build() | ||
val mainContainerWithMountedFiles = new ContainerBuilder( | ||
originalPodWithUnattachedInitContainer.mainContainer) | ||
.addToVolumeMounts(sharedVolumeMounts: _*) | ||
.addNewEnv() | ||
.withName(ENV_MOUNTED_FILES_DIR) | ||
.withValue(filesDownloadPath) | ||
.endEnv() | ||
.build() | ||
PodWithDetachedInitContainer( | ||
podWithBasicVolumes, | ||
initContainer, | ||
mainContainerWithMountedFiles) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -418,6 +418,14 @@ package object config extends Logging { | |
.stringConf | ||
.createOptional | ||
|
||
private[spark] val INIT_CONTAINER_REMOTE_PYSPARK_FILES = | ||
ConfigBuilder("spark.kubernetes.initcontainer.remotePyFiles") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this newly-required from this refactor? I expected there to be change in user-visible behavior There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch - I don't think this is necessary, this is an artifact of something I was trying before. |
||
.doc("Comma-separated list of Python file URIs to download in the init-container. This is" + | ||
" calculated given the list of python files sent to spark-submit.") | ||
.internal() | ||
.stringConf | ||
.createOptional | ||
|
||
private[spark] val INIT_CONTAINER_DOCKER_IMAGE = | ||
ConfigBuilder("spark.kubernetes.initcontainer.docker.image") | ||
.doc("Image for the driver and executor's init-container that downloads dependencies.") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) thanks for the --primary-py-file