diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-31 14:18:16 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-31 14:18:16 -0700 |
commit | 14bf2fe039725013b5539e511ba1778407b09b86 (patch) | |
tree | e73bbec3788ed3bb9575409dcab2feb9e585fc9c /core | |
parent | 4ba4c3fe1a7053d1a7414bda9e218d76af8c97fc (diff) | |
parent | 529ac81195505f59228fd212a5bad154ab316683 (diff) | |
download | spark-14bf2fe039725013b5539e511ba1778407b09b86.tar.gz spark-14bf2fe039725013b5539e511ba1778407b09b86.tar.bz2 spark-14bf2fe039725013b5539e511ba1778407b09b86.zip |
Merge pull request #749 from benh/spark-executor-uri
Added property 'spark.executor.uri' for launching on Mesos.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 17 |
2 files changed, 32 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 7bc6040544..6ebbb5ec9b 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend( } def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val runScript = new File(sparkHome, "run").getCanonicalPath - val driverUrl = "akka://spark@%s:%s/user/%s".format( - System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) - val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( - runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(value) .build()) } - return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + val runScript = new File(sparkHome, "run").getCanonicalPath + command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = uri.split('/').last.split('.').head + command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } + return command.build() } override def offerRescinded(d: SchedulerDriver, o: OfferID) {} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 75b8268b55..f6069a5775 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend( val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val environment = Environment.newBuilder() sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend( .setValue(value) .build()) } + val command = CommandInfo.newBuilder() + .setEnvironment(environment) + val uri = System.getProperty("spark.executor.uri") + if (uri == null) { + command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = uri.split('/').last.split('.').head + command.setValue("cd %s*; ./spark-executor".format(basename)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + } val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .build() - val command = CommandInfo.newBuilder() - .setValue(execScript) - .setEnvironment(environment) - .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) |