aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-01-18 13:24:37 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-21 11:21:22 -0800
commitf116d6b5c6029c2f96160bd84829a6fe8b73cccf (patch)
treee78640af6947c637aeb8ca8bbea68e698266073e
parent414b41e1d8f431585d842407958076e747b0438e (diff)
downloadspark-f116d6b5c6029c2f96160bd84829a6fe8b73cccf.tar.gz
spark-f116d6b5c6029c2f96160bd84829a6fe8b73cccf.tar.bz2
spark-f116d6b5c6029c2f96160bd84829a6fe8b73cccf.zip
executor can use a different sparkHome from Worker
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala4
-rw-r--r--core/src/main/scala/spark/deploy/JobDescription.scala5
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala3
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala9
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala3
6 files changed, 18 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 457122745b..7ee3e63429 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -5,6 +5,7 @@ import spark.deploy.master.{WorkerInfo, JobInfo}
import spark.deploy.worker.ExecutorRunner
import scala.collection.immutable.List
import scala.collection.mutable.HashMap
+import java.io.File
private[spark] sealed trait DeployMessage extends Serializable
@@ -42,7 +43,8 @@ private[spark] case class LaunchExecutor(
execId: Int,
jobDesc: JobDescription,
cores: Int,
- memory: Int)
+ memory: Int,
+ sparkHome: File)
extends DeployMessage
diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala
index 20879c5f11..7f8f9af417 100644
--- a/core/src/main/scala/spark/deploy/JobDescription.scala
+++ b/core/src/main/scala/spark/deploy/JobDescription.scala
@@ -1,10 +1,13 @@
package spark.deploy
+import java.io.File
+
private[spark] class JobDescription(
val name: String,
val cores: Int,
val memoryPerSlave: Int,
- val command: Command)
+ val command: Command,
+ val sparkHome: File)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index 57a7e123b7..dc743b1fbf 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -3,6 +3,7 @@ package spark.deploy.client
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import spark.deploy.{Command, JobDescription}
+import java.io.File
private[spark] object TestClient {
@@ -25,7 +26,7 @@ private[spark] object TestClient {
val url = args(0)
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
val desc = new JobDescription(
- "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()))
+ "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), new File("dummy-spark-home"))
val listener = new TestListener
val client = new Client(actorSystem, url, desc, listener)
client.start()
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 6ecebe626a..f0bee67159 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -6,6 +6,7 @@ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, Remote
import java.text.SimpleDateFormat
import java.util.Date
+import java.io.File
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -173,7 +174,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = job.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec)
+ launchExecutor(usableWorkers(pos), exec, job.desc.sparkHome)
job.state = JobState.RUNNING
}
}
@@ -186,7 +187,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val coresToUse = math.min(worker.coresFree, job.coresLeft)
if (coresToUse > 0) {
val exec = job.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
+ launchExecutor(worker, exec, job.desc.sparkHome)
job.state = JobState.RUNNING
}
}
@@ -195,10 +196,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: File) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory)
+ worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory, sparkHome)
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 7c9e588ea2..078b2d8037 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -119,10 +119,10 @@ private[spark] class Worker(
logError("Worker registration failed: " + message)
System.exit(1)
- case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) =>
+ case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name))
val manager = new ExecutorRunner(
- jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir)
+ jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, execSparkHome_, workDir)
executors(jobId + "/" + execId) = manager
manager.start()
coresUsed += cores_
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index e2301347e5..0dcc2efaca 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -4,6 +4,7 @@ import spark.{Utils, Logging, SparkContext}
import spark.deploy.client.{Client, ClientListener}
import spark.deploy.{Command, JobDescription}
import scala.collection.mutable.HashMap
+import java.io.File
private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
@@ -39,7 +40,7 @@ private[spark] class SparkDeploySchedulerBackend(
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
- val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
+ val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command, new File(sc.sparkHome))
client = new Client(sc.env.actorSystem, master, jobDesc, this)
client.start()