diff options
author | Mridul Muralidharan <mridul@gmail.com> | 2013-04-16 02:57:43 +0530 |
---|---|---|
committer | Mridul Muralidharan <mridul@gmail.com> | 2013-04-16 02:57:43 +0530 |
commit | 5540ab8243a8488e30a21e1d4bb1720f1a9a555f (patch) | |
tree | 09d2c6bbcbdf72c9ba63e415a5302f86173d17cc | |
parent | eb7e95e833376904bea4a9e6d1cc67c00fcfb06c (diff) | |
download | spark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.tar.gz spark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.tar.bz2 spark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.zip |
Use hostname instead of hostport for executor, fix creation of workdir
4 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index dfcb9f0d05..04a774658e 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -70,7 +70,7 @@ private[spark] class ExecutorRunner( /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { case "{{EXECUTOR_ID}}" => execId.toString - case "{{HOSTPORT}}" => hostPort + case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1 case "{{CORES}}" => cores.toString case other => other } diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index cf4babc892..1a7da0f7bf 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -54,10 +54,11 @@ private[spark] class Worker( def createWorkDir() { workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work")) try { - if (!workDir.exists() && !workDir.mkdirs()) { + if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) { logError("Failed to create work directory " + workDir) System.exit(1) } + assert (workDir.isDirectory) } catch { case e: Exception => logError("Failed to create work directory " + workDir, e) diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 49e1f3f07a..ebe2ac68d8 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -75,17 +75,18 @@ private[spark] object StandaloneExecutorBackend { def run0(args: Product) { assert(4 == args.productArity) runImpl(args.productElement(0).asInstanceOf[String], - args.productElement(0).asInstanceOf[String], - args.productElement(0).asInstanceOf[String], - args.productElement(0).asInstanceOf[Int]) + args.productElement(1).asInstanceOf[String], + args.productElement(2).asInstanceOf[String], + args.productElement(3).asInstanceOf[Int]) } private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) { + // Debug code + Utils.checkHost(hostname) + // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) - // Debug code - Utils.checkHost(hostname) // set it val sparkHostPort = hostname + ":" + boundPort System.setProperty("spark.hostPort", sparkHostPort) diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 6b61152ed0..0b8922d139 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -27,7 +27,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTPORT}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse( throw new IllegalArgumentException("must supply spark home for spark standalone")) |