aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala30
1 files changed, 23 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 06f0e2881c..1067a7f1ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -370,16 +370,21 @@ private[spark] class MesosClusterScheduler(
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
- val cmdOptions = generateCmdOption(desc)
+ val cmdOptions = generateCmdOption(desc).mkString(" ")
+ val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
val appArguments = desc.command.arguments.mkString(" ")
- val cmd = if (executorUri.isDefined) {
+ val (executable, jar) = if (dockerDefined) {
+ // Application jar is automatically downloaded in the mounted sandbox by Mesos,
+ // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
+ ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
+ } else if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
val cmdJar = s"../${desc.jarUrl.split("/").last}"
- s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+ (cmdExecutable, cmdJar)
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
@@ -389,9 +394,9 @@ private[spark] class MesosClusterScheduler(
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
val cmdJar = desc.jarUrl.split("/").last
- s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments"
+ (cmdExecutable, cmdJar)
}
- builder.setValue(cmd)
+ builder.setValue(s"$executable $cmdOptions $jar $appArguments")
builder.setEnvironment(envBuilder.build())
builder.build()
}
@@ -458,9 +463,20 @@ private[spark] class MesosClusterScheduler(
.setCommand(commandInfo)
.addResources(cpuResource)
.addResources(memResource)
- .build()
+ submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
+ val container = taskInfo.getContainerBuilder()
+ val volumes = submission.schedulerProperties
+ .get("spark.mesos.executor.docker.volumes")
+ .map(MesosSchedulerBackendUtil.parseVolumesSpec)
+ val portmaps = submission.schedulerProperties
+ .get("spark.mesos.executor.docker.portmaps")
+ .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
+ MesosSchedulerBackendUtil.addDockerInfo(
+ container, image, volumes = volumes, portmaps = portmaps)
+ taskInfo.setContainer(container.build())
+ }
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
- queuedTasks += taskInfo
+ queuedTasks += taskInfo.build()
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId,