aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala42
-rw-r--r--docs/running-on-mesos.md2
3 files changed, 41 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 36e9750b86..ad92f5635a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -319,9 +319,6 @@ object SparkSubmit {
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
- case (MESOS, CLUSTER) if args.isPython =>
- printErrorAndExit("Cluster deploy mode is currently not supported for python " +
- "applications on Mesos clusters.")
case (MESOS, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on Mesos clusters.")
@@ -554,7 +551,15 @@ object SparkSubmit {
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
- childArgs += (args.primaryResource, args.mainClass)
+ if (args.isPython) {
+ // Second argument is main class
+ childArgs += (args.primaryResource, "")
+ if (args.pyFiles != null) {
+ sysProps("spark.submit.pyFiles") = args.pyFiles
+ }
+ } else {
+ childArgs += (args.primaryResource, args.mainClass)
+ }
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 07da9242b9..a6d9374eb9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,7 +29,6 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
-
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
@@ -375,21 +374,20 @@ private[spark] class MesosClusterScheduler(
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
- val cmdOptions = generateCmdOption(desc).mkString(" ")
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
- val appArguments = desc.command.arguments.mkString(" ")
- val (executable, jar) = if (dockerDefined) {
+ // Gets the path to run spark-submit, and the path to the Mesos sandbox.
+ val (executable, sandboxPath) = if (dockerDefined) {
// Application jar is automatically downloaded in the mounted sandbox by Mesos,
// and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
- ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
+ ("./bin/spark-submit", "$MESOS_SANDBOX")
} else if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
- val cmdJar = s"../${desc.jarUrl.split("/").last}"
- (cmdExecutable, cmdJar)
+ // Sandbox path points to the parent folder as we chdir into the folderBasename.
+ (cmdExecutable, "..")
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
@@ -398,30 +396,50 @@ private[spark] class MesosClusterScheduler(
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
- val cmdJar = desc.jarUrl.split("/").last
- (cmdExecutable, cmdJar)
+ // Sandbox points to the current directory by default with Mesos.
+ (cmdExecutable, ".")
}
- builder.setValue(s"$executable $cmdOptions $jar $appArguments")
+ val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
+ val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
+ val appArguments = desc.command.arguments.mkString(" ")
+ builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
builder.setEnvironment(envBuilder.build())
conf.getOption("spark.mesos.uris").map { uris =>
setupUris(uris, builder)
}
+ desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
+ setupUris(uris, builder)
+ }
+ desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+ setupUris(pyFiles, builder)
+ }
builder.build()
}
- private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
+ private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
- "--class", desc.command.mainClass,
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
+
+ // Assume empty main class means we're running python
+ if (!desc.command.mainClass.equals("")) {
+ options ++= Seq("--class", desc.command.mainClass)
+ }
+
desc.schedulerProperties.get("spark.executor.memory").map { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
options ++= Seq("--total-executor-cores", v)
}
+ desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+ val formattedFiles = pyFiles.split(",")
+ .map { path => new File(sandboxPath, path.split("/").last).toString() }
+ .mkString(",")
+ options ++= Seq("--py-files", formattedFiles)
+ }
options
}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index f36921ae30..247e6ecfbd 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -157,6 +157,8 @@ From the client, you can submit a job to Mesos cluster by running `spark-submit`
to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
+Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves.
+
# Mesos Run Modes
Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained".