-
Notifications
You must be signed in to change notification settings - Fork 118
Spark on Kubernetes - basic submission client #545
Conversation
This will be the follow-up to apache#19468 |
This seems like a large diff, but a quick scan shows everything included as necessary. We need the driver service bootstrap because of changes to master. I think we can reduce the fanciness of the credentials step but that doesn't reduce the complexity by a significant amount. |
One TODO: Add the unit test in #542 to this PR |
Changed from #542 merged in. |
private[spark] object ClientArguments { | ||
def fromCommandLineArgs(args: Array[String]): ClientArguments = { | ||
var mainAppResource: Option[MainAppResource] = None | ||
val otherPyFiles = Seq.empty[String] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we're using this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
addDriverOwnerReference(createdDriverPod, otherKubernetesResources) | ||
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() | ||
} | ||
} catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we want to catch Throwable here - look into NonFatal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to NonFatal(e)
.
.addAllToEnv(driverCustomEnvs.asJava) | ||
.addToEnv(driverExtraClasspathEnv.toSeq: _*) | ||
.addNewEnv() | ||
.withName(ENV_DRIVER_MEMORY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These environment variable keys don't make much sense without the DockerFile which describes the contract the submission client must fulfill.
It might make sense to include the Dockerfile for the driver and the executor in this PR. We shouldn't add the poms that build them - that would make this diff unnecessarily large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we still put them under docker-minimum-bundle
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Though, for awhile now I've been thinking that there's probably a better name for this submodule =)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just simply call it docker
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think that should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
A few comments but otherwise this captures the spirit of what we want to have upstream. |
COPY bin /opt/spark/bin | ||
COPY sbin /opt/spark/sbin | ||
COPY conf /opt/spark/conf | ||
COPY dockerfiles/spark-base/entrypoint.sh /opt/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we still want to include it to show the projected contents of the image.
@@ -0,0 +1,43 @@ | |||
# |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we want src/main/dockerfiles
as the top level directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
masterWithoutK8sPrefix | ||
} else { | ||
val resolvedURL = s"https://$masterWithoutK8sPrefix" | ||
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not s"
plain text...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps logInfo? this sounds useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
submissionSparkConf, | ||
KUBERNETES_DRIVER_LABEL_PREFIX, | ||
"label") | ||
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not s"
plain text...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
"label") | ||
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + | ||
s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + | ||
s" operations.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not s"
plain text...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
"annotation") | ||
require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), | ||
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + | ||
s" Spark bookkeeping operations.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not s"
plain text...
and align this with the previous line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
val caCertDataBase64 = safeFileConfToBase64( | ||
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", | ||
s"Driver CA cert file provided at %s does not exist or is not a file.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not s"
plain text...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
import scala.collection.mutable | ||
import scala.util.control.NonFatal | ||
|
||
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVar, EnvVarBuilder, HasMetadata, OwnerReferenceBuilder, Pod, PodBuilder} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more than 6, use import ._
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod) | ||
.editSpec() | ||
.addToContainers(resolvedDriverContainer) | ||
.endSpec() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indention probably makes sense as editSpec
returns an object different than PodBuilder
.
.build()) | ||
.endEnv() | ||
.withNewResources() | ||
.addToRequests("cpu", driverCpuQuantity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens when we require for more than what the agent has? eg. more cores than number of cores in the agent node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause the driver to not be scheduled by the k8s scheduler onto a node until a node with that many cores becomes available in the cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general YARN doesn't handle this case nicely either, but I wonder in k8s we could do better if it is hooked up to an autoscaler or something?
.addToAnnotations(allDriverAnnotations.asJava) | ||
.endMetadata() | ||
.withNewSpec() | ||
.withRestartPolicy("Never") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be configurable in the future? some sort of driver HA in cluster mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think we can make it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have an issue to track this. HA driver would be useful in a streaming context in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1000x
import org.apache.spark.deploy.k8s.constants._ | ||
import org.apache.spark.deploy.k8s.submit.steps.{DriverConfigurationStep, KubernetesDriverSpec} | ||
|
||
private[spark] class ClientSuite extends SparkFunSuite with BeforeAndAfter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't do private[spark] with the Suite classes - it won't run by jenkins
see c052212
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
If there's no objection, I will squash the commits and push to upstream for review by EOD today. @apache-spark-on-k8s/contributors |
SGTM! We should see how we can make it less confusing for reviewers - because this PR encompasses changes in spark-kubernetes-3. |
When pushing upstream, I'm gonna remove code for the first PR so this is less confusing. |
a461c37
to
ac03b7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial batch of comments, mostly about readability. I still need to look at the various submission/configuration steps.
case (KUBERNETES, CLIENT) => | ||
printErrorAndExit("Client mode is currently not supported for Kubernetes.") | ||
case (KUBERNETES, CLUSTER) if args.isPython => | ||
printErrorAndExit("Cluster deploy mode is currently not supported for python " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this message could mislead users to think python client mode is supported. Users will try python client mode and will get the error in line 302, which is not a good experience. Is it possible to pattern match case (KUBERNEETES, _) if args.isPython
instead, before line 301? Then, the error message can say "python is not supported for Kubernetes".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
case (KUBERNETES, CLUSTER) if args.isPython => | ||
printErrorAndExit("Cluster deploy mode is currently not supported for python " + | ||
"applications on Kubernetes clusters.") | ||
case (KUBERNETES, CLUSTER) if args.isR => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. Wonder if we can match case (KUBERNETES, _) if args.isR
before line 301 and say "R is not supported in Kubernetes".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
childMainClass = "org.apache.spark.deploy.k8s.submit.Client" | ||
childArgs ++= Array("--primary-java-resource", args.primaryResource) | ||
childArgs ++= Array("--main-class", args.mainClass) | ||
args.childArgs.foreach { arg => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a null check on args.childArgs
is done at line 695 and a few others:
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
Maybe we should do the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -466,6 +473,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | |||
case USAGE_ERROR => | |||
printUsageAndExit(1) | |||
|
|||
case KUBERNETES_NAMESPACE => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. I would expect a significant parameter like this to come before line 464. Line 464 - 473 are mostly about help and usage errors, it seems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
mainClass: String, | ||
driverArgs: Array[String]) | ||
|
||
private[spark] object ClientArguments { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer one empty line between class/object header and the body, but I don't know if this is standard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
val resolvedDriverJavaOpts = currentDriverSpec | ||
.driverSparkConf | ||
// We don't need this anymore since we just set the JVM options on the environment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is the code that this comment is referring to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it seems it was added in #365. I think we should keep the comment but rephrase it to make it clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment still applies because the lines below take the driver JVM options and append them to the JVM options of the SparkConf. For example, we don't want to have SPARK_DRIVER_JAVA_OPTS have a value of -Dspark.driver.extraJavaOptions=-XX:HeapDumpOnOutOfMemoryError
- we just want the -XX:HeapDumpOn...
to be set directly on the driver process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment should be rephrased to clarify the intent of removing extraJavaOptions
, along the lines of what Matt said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
private[spark] object Client { | ||
def run(sparkConf: SparkConf, clientArguments: ClientArguments): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about one empty line before the class body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) | ||
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") | ||
val master = getK8sMasterUrl(sparkConf.get("spark.master")) | ||
val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)).filter( _ => waitForAppCompletion) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. Don't need a space before _
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
clientArguments.driverArgs, | ||
sparkConf) | ||
|
||
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular line does not read well because the word "KubernetesClient" appears here twice meaning two different things. The reader may fail to distinguish "Spark client" (SparkKubernetesClientFactory
), vs "K8s API client" (createKubernetesClient
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed SparkKubernetesClientFactory
to KubernetesClientFactory
and renamed the method to create
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I myself misread. The two clients meant the same thing :-)
6f065a9
to
af3f315
Compare
The latest commit seems to address my comments so far. Thanks! |
3ad6d7b
to
37c7ad6
Compare
Squashed the commit and removed scheduler backend code and relevant changes in Yarn-related code. |
@kimoonkim any more comments on the submission steps? |
This is under review at apache#19717. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submission steps look good to me. Left a few minor comments. PTAL.
|
||
import org.apache.spark.SparkConf | ||
|
||
private[spark] object ConfigurationUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put an empty line before the body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
kubernetesClient, | ||
waitForAppCompletion, | ||
appName, | ||
loggingPodStatusWatcher).run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I almost missed .run()
at the end. Maybe we can use aval
here:
kubernetesClient =>
val sparkClient = new Client(
configurationStepsOrchestrator.getAllConfigurationSteps(),
sparkConf,
kubernetesClient,
waitForAppCompletion,
appName,
loggingPodStatusWatcher)
sparkClient.run()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
37c7ad6
to
6d50369
Compare
f38144b
to
60234a2
Compare
## What changes were proposed in this pull request? apache#19696 replaced the deprecated usages for `Date` and `Waiter`, but a few methods were missed. The PR fixes the forgotten deprecated usages. ## How was this patch tested? existing UTs Author: Marco Gaido <[email protected]> Closes apache#19875 from mgaido91/SPARK-22473_FOLLOWUP.
…n the RDD commit protocol I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId as the jobId on the driver's side, and the stageId as the jobId on the executors' side. With this change executors and the driver will consistently uses rddId as the jobId. Also with this change, during the hadoop commit protocol spark uses actual stageId to check whether a stage can be committed unlike before that it was using executors' jobId to do this check. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. Author: Reza Safi <[email protected]> Closes apache#19848 from rezasafi/stagerddsimple.
05f528a
to
cfcf2a7
Compare
The main goal of this change is to allow multiple cluster-mode submissions from the same JVM, without having them end up with mixed configuration. That is done by extending the SparkApplication trait, and doing so was reasonably trivial for standalone and mesos modes. For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE" system property to control behavior indirectly in a whole bunch of places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes. Most of the changes here are removing that. Since we removed support for Hadoop 1.x, some methods that lived in YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining methods don't need to be part of the class, and can be called directly from the YarnSparkHadoopUtil object, so now there's a single implementation of SparkHadoopUtil. There were two places in the code that relied on SPARK_YARN_MODE to make decisions about YARN-specific functionality, and now explicitly check the master from the configuration for that instead: * fetching the external shuffle service port, which can come from the YARN configuration. * propagation of the authentication secret using Hadoop credentials. This also was cleaned up a little to not need so many methods in `SparkHadoopUtil`. With those out of the way, actually changing the YARN client to extend SparkApplication was easy. Tested with existing unit tests, and also by running YARN apps with auth and kerberos both on and off in a real cluster. Author: Marcelo Vanzin <[email protected]> Closes apache#19631 from vanzin/SPARK-22372.
cfcf2a7
to
faa2849
Compare
51844cc
to
0936fbe
Compare
0936fbe
to
0e8ca01
Compare
The PR has been merged upstream. Closing this. |
Second draft upstreaming PR that contains the basic submission client implementation and unit tests. Branch
spark-kubernetes-3-updated
is a clone ofspark-kubernetes-3
with latest changes fromupstream/master
merged in.spark-kubernetes-4
includes all our changes inspark-kubernetes-3
.cc @foxish @mccheah @apache-spark-on-k8s/contributors