diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 06f0e2881c..1067a7f1ca 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -370,16 +370,21 @@ private[spark] class MesosClusterScheduler( val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ") envBuilder.addVariables( Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts)) - val cmdOptions = generateCmdOption(desc) + val cmdOptions = generateCmdOption(desc).mkString(" ") + val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image") val executorUri = desc.schedulerProperties.get("spark.executor.uri") .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI")) val appArguments = desc.command.arguments.mkString(" ") - val cmd = if (executorUri.isDefined) { + val (executable, jar) = if (dockerDefined) { + // Application jar is automatically downloaded in the mounted sandbox by Mesos, + // and the path to the mounted volume is stored in $MESOS_SANDBOX env variable. + ("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}") + } else if (executorUri.isDefined) { builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build()) val folderBasename = executorUri.get.split('/').last.split('.').head val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit" val cmdJar = s"../${desc.jarUrl.split("/").last}" - s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments" + (cmdExecutable, cmdJar) } else { val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home") .orElse(conf.getOption("spark.home")) @@ -389,9 +394,9 @@ private[spark] class MesosClusterScheduler( } val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath val cmdJar = desc.jarUrl.split("/").last - s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar $appArguments" + (cmdExecutable, cmdJar) } - builder.setValue(cmd) + builder.setValue(s"$executable $cmdOptions $jar $appArguments") builder.setEnvironment(envBuilder.build()) builder.build() } @@ -458,9 +463,20 @@ private[spark] class MesosClusterScheduler( .setCommand(commandInfo) .addResources(cpuResource) .addResources(memResource) - .build() + submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => + val container = taskInfo.getContainerBuilder() + val volumes = submission.schedulerProperties + .get("spark.mesos.executor.docker.volumes") + .map(MesosSchedulerBackendUtil.parseVolumesSpec) + val portmaps = submission.schedulerProperties + .get("spark.mesos.executor.docker.portmaps") + .map(MesosSchedulerBackendUtil.parsePortMappingsSpec) + MesosSchedulerBackendUtil.addDockerInfo( + container, image, volumes = volumes, portmaps = portmaps) + taskInfo.setContainer(container.build()) + } val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo]) - queuedTasks += taskInfo + queuedTasks += taskInfo.build() logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + submission.submissionId) val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, |