aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMridul Muralidharan <mridul@gmail.com>2013-04-16 02:57:43 +0530
committerMridul Muralidharan <mridul@gmail.com>2013-04-16 02:57:43 +0530
commit5540ab8243a8488e30a21e1d4bb1720f1a9a555f (patch)
tree09d2c6bbcbdf72c9ba63e415a5302f86173d17cc
parenteb7e95e833376904bea4a9e6d1cc67c00fcfb06c (diff)
downloadspark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.tar.gz
spark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.tar.bz2
spark-5540ab8243a8488e30a21e1d4bb1720f1a9a555f.zip
Use hostname instead of hostport for executor, fix creation of workdir
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala3
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
4 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index dfcb9f0d05..04a774658e 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -70,7 +70,7 @@ private[spark] class ExecutorRunner(
/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case "{{EXECUTOR_ID}}" => execId.toString
- case "{{HOSTPORT}}" => hostPort
+ case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1
case "{{CORES}}" => cores.toString
case other => other
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index cf4babc892..1a7da0f7bf 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -54,10 +54,11 @@ private[spark] class Worker(
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
- if (!workDir.exists() && !workDir.mkdirs()) {
+ if ( (workDir.exists() && !workDir.isDirectory) || (!workDir.exists() && !workDir.mkdirs()) ) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
+ assert (workDir.isDirectory)
} catch {
case e: Exception =>
logError("Failed to create work directory " + workDir, e)
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 49e1f3f07a..ebe2ac68d8 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -75,17 +75,18 @@ private[spark] object StandaloneExecutorBackend {
def run0(args: Product) {
assert(4 == args.productArity)
runImpl(args.productElement(0).asInstanceOf[String],
- args.productElement(0).asInstanceOf[String],
- args.productElement(0).asInstanceOf[String],
- args.productElement(0).asInstanceOf[Int])
+ args.productElement(1).asInstanceOf[String],
+ args.productElement(2).asInstanceOf[String],
+ args.productElement(3).asInstanceOf[Int])
}
private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) {
+ // Debug code
+ Utils.checkHost(hostname)
+
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
- // Debug code
- Utils.checkHost(hostname)
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 6b61152ed0..0b8922d139 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -27,7 +27,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
- val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTPORT}}", "{{CORES}}")
+ val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(
throw new IllegalArgumentException("must supply spark home for spark standalone"))