Skip to content

Commit 74a470f

Browse files
mccheahPuneet Loya
authored and
Puneet Loya
committed
Submission client redesign to use a step-based builder pattern (apache-spark-on-k8s#365)
* Submission client redesign to use a step-based builder pattern. This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments. * Add a unit test for BaseSubmissionStep. * Add unit test for kubernetes credentials mounting. * Add unit test for InitContainerBootstrapStep. * unit tests for initContainer * Add a unit test for DependencyResolutionStep. * further modifications to InitContainer unit tests * Use of resolver in PythonStep and unit tests for PythonStep * refactoring of init unit tests and pythonstep resolver logic * Add unit test for KubernetesSubmissionStepsOrchestrator. * refactoring and addition of secret trustStore+Cert checks in a SubmissionStepSuite * added SparkPodInitContainerBootstrapSuite * Added InitContainerResourceStagingServerSecretPluginSuite * style in Unit tests * extremely minor style fix in variable naming * Address comments. * Rename class for consistency. * Attempt to make spacing consistent. Multi-line methods should have four-space indentation for arguments that aren't on the same line as the method call itself... but this is difficult to do consistently given how IDEs handle Scala multi-line indentation in most cases.
1 parent 12c9f6e commit 74a470f

File tree

56 files changed

+2946
-2911
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2946
-2911
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

+9-6
Original file line numberDiff line numberDiff line change
@@ -657,14 +657,17 @@ object SparkSubmit extends CommandLineUtils {
657657
if (isKubernetesCluster) {
658658
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
659659
if (args.isPython) {
660-
childArgs += args.primaryResource
661-
childArgs += "org.apache.spark.deploy.PythonRunner"
662-
childArgs += args.pyFiles
660+
childArgs ++= Array("--primary-py-file", args.primaryResource)
661+
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
662+
childArgs ++= Array("--other-py-files", args.pyFiles)
663663
} else {
664-
childArgs += args.primaryResource
665-
childArgs += args.mainClass
664+
childArgs ++= Array("--primary-java-resource", args.primaryResource)
665+
childArgs ++= Array("--main-class", args.mainClass)
666+
}
667+
args.childArgs.foreach { arg =>
668+
childArgs += "--arg"
669+
childArgs += arg
666670
}
667-
childArgs ++= args.childArgs
668671
}
669672

670673
// Load any properties specified through --conf and the default properties file

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/InitContainerResourceStagingServerSecretPlugin.scala

+21-17
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.spark.deploy.kubernetes
1818

19-
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder, Secret}
2020

2121
import org.apache.spark.deploy.kubernetes.constants._
2222

@@ -27,13 +27,13 @@ private[spark] trait InitContainerResourceStagingServerSecretPlugin {
2727
* from a resource staging server.
2828
*/
2929
def mountResourceStagingServerSecretIntoInitContainer(
30-
initContainer: ContainerBuilder): ContainerBuilder
30+
initContainer: Container): Container
3131

3232
/**
3333
* Configure the pod to attach a Secret volume which hosts secret files allowing the
3434
* init-container to retrieve dependencies from the resource staging server.
3535
*/
36-
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
36+
def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod
3737
}
3838

3939
private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
@@ -42,21 +42,25 @@ private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
4242
extends InitContainerResourceStagingServerSecretPlugin {
4343

4444
override def mountResourceStagingServerSecretIntoInitContainer(
45-
initContainer: ContainerBuilder): ContainerBuilder = {
46-
initContainer.addNewVolumeMount()
47-
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
48-
.withMountPath(initContainerSecretMountPath)
49-
.endVolumeMount()
45+
initContainer: Container): Container = {
46+
new ContainerBuilder(initContainer)
47+
.addNewVolumeMount()
48+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
49+
.withMountPath(initContainerSecretMountPath)
50+
.endVolumeMount()
51+
.build()
5052
}
5153

52-
override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
53-
basePod.editSpec()
54-
.addNewVolume()
55-
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
56-
.withNewSecret()
57-
.withSecretName(initContainerSecretName)
58-
.endSecret()
59-
.endVolume()
60-
.endSpec()
54+
override def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod = {
55+
new PodBuilder(basePod)
56+
.editSpec()
57+
.addNewVolume()
58+
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
59+
.withNewSecret()
60+
.withSecretName(initContainerSecretName)
61+
.endSecret()
62+
.endVolume()
63+
.endSpec()
64+
.build()
6165
}
6266
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.kubernetes
18+
19+
import io.fabric8.kubernetes.api.model.{Container, Pod}
20+
21+
private[spark] case class PodWithDetachedInitContainer(
22+
pod: Pod,
23+
initContainer: Container,
24+
mainContainer: Container)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/SparkPodInitContainerBootstrap.scala

+27-23
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes
1919
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}
2020

2121
import org.apache.spark.deploy.kubernetes.constants._
22-
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}
2322

23+
/**
24+
* This is separated out from the init-container steps API because this component can be reused to
25+
* set up the init-container for executors as well.
26+
*/
2427
private[spark] trait SparkPodInitContainerBootstrap {
2528
/**
2629
* Bootstraps an init-container that downloads dependencies to be used by a main container.
2730
* Note that this primarily assumes that the init-container's configuration is being provided
2831
* by a ConfigMap that was installed by some other component; that is, the implementation
2932
* here makes no assumptions about how the init-container is specifically configured. For
3033
* example, this class is unaware if the init-container is fetching remote dependencies or if
31-
* it is fetching dependencies from a resource staging server.
34+
* it is fetching dependencies from a resource staging server. Additionally, the container itself
35+
* is not actually attached to the pod, but the init container is returned so it can be attached
36+
* by InitContainerUtil after the caller has decided to make any changes to it.
3237
*/
3338
def bootstrapInitContainerAndVolumes(
34-
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
39+
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
40+
: PodWithDetachedInitContainer
3541
}
3642

3743
private[spark] class SparkPodInitContainerBootstrapImpl(
@@ -41,13 +47,11 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
4147
filesDownloadPath: String,
4248
downloadTimeoutMinutes: Long,
4349
initContainerConfigMapName: String,
44-
initContainerConfigMapKey: String,
45-
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
50+
initContainerConfigMapKey: String)
4651
extends SparkPodInitContainerBootstrap {
4752

4853
override def bootstrapInitContainerAndVolumes(
49-
mainContainerName: String,
50-
originalPodSpec: PodBuilder): PodBuilder = {
54+
podWithDetachedInitContainer: PodWithDetachedInitContainer): PodWithDetachedInitContainer = {
5155
val sharedVolumeMounts = Seq[VolumeMount](
5256
new VolumeMountBuilder()
5357
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
@@ -58,7 +62,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
5862
.withMountPath(filesDownloadPath)
5963
.build())
6064

61-
val initContainer = new ContainerBuilder()
65+
val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer)
6266
.withName(s"spark-init")
6367
.withImage(initContainerImage)
6468
.withImagePullPolicy(dockerImagePullPolicy)
@@ -68,11 +72,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
6872
.endVolumeMount()
6973
.addToVolumeMounts(sharedVolumeMounts: _*)
7074
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
71-
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
72-
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
73-
}.getOrElse(initContainer).build()
74-
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
75-
originalPodSpec, resolvedInitContainer)
75+
.build()
76+
val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod)
7677
.editSpec()
7778
.addNewVolume()
7879
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
@@ -92,17 +93,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
9293
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
9394
.withEmptyDir(new EmptyDirVolumeSource())
9495
.endVolume()
95-
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
96-
.addToVolumeMounts(sharedVolumeMounts: _*)
97-
.addNewEnv()
98-
.withName(ENV_MOUNTED_FILES_DIR)
99-
.withValue(filesDownloadPath)
100-
.endEnv()
101-
.endContainer()
10296
.endSpec()
103-
resourceStagingServerSecretPlugin.map { plugin =>
104-
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
105-
}.getOrElse(podWithBasicVolumes)
97+
.build()
98+
val mainContainerWithMountedFiles = new ContainerBuilder(
99+
podWithDetachedInitContainer.mainContainer)
100+
.addToVolumeMounts(sharedVolumeMounts: _*)
101+
.addNewEnv()
102+
.withName(ENV_MOUNTED_FILES_DIR)
103+
.withValue(filesDownloadPath)
104+
.endEnv()
105+
.build()
106+
PodWithDetachedInitContainer(
107+
podWithBasicVolumes,
108+
initContainer,
109+
mainContainerWithMountedFiles)
106110
}
107111

108112
}

0 commit comments

Comments
 (0)