diff options
Diffstat (limited to 'core/src/main/scala/spark/executor')
3 files changed, 51 insertions, 50 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 2552958d27..bd21ba719a 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -30,7 +30,7 @@ private[spark] class Executor extends Logging { initLogging() - def initialize(slaveHostname: String, properties: Seq[(String, String)]) { + def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { // Make sure the local hostname we report matches the cluster scheduler's name for this host Utils.setCustomHostname(slaveHostname) @@ -64,7 +64,7 @@ private[spark] class Executor extends Logging { ) // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false) + env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env) // Start worker thread pool @@ -159,22 +159,24 @@ private[spark] class Executor extends Logging { * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) { - // Fetch missing dependencies - for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(".")) - currentFiles(name) = timestamp - } - for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { - logInfo("Fetching " + name + " with timestamp " + timestamp) - Utils.fetchFile(name, new File(".")) - currentJars(name) = timestamp - // Add it to our class loader - val localName = name.split("/").last - val url = new File(".", localName).toURI.toURL - if (!urlClassLoader.getURLs.contains(url)) { - logInfo("Adding " + url + " to class loader") - urlClassLoader.addURL(url) + synchronized { + // Fetch missing dependencies + for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) { + logInfo("Fetching " + name + " with timestamp " + timestamp) + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory)) + currentFiles(name) = timestamp + } + for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) { + logInfo("Fetching " + name + " with timestamp " + timestamp) + Utils.fetchFile(name, new File(SparkFiles.getRootDirectory)) + currentJars(name) = timestamp + // Add it to our class loader + val localName = name.split("/").last + val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL + if (!urlClassLoader.getURLs.contains(url)) { + logInfo("Adding " + url + " to class loader") + urlClassLoader.addURL(url) + } } } } diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index eeab3959c6..818d6d1dda 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -29,9 +29,14 @@ private[spark] class MesosExecutorBackend(executor: Executor) executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { + logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize(slaveInfo.getHostname, properties) + executor.initialize( + executorInfo.getExecutorId.getValue, + slaveInfo.getHostname, + properties + ) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 915f71ba9f..224c126fdd 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -4,78 +4,72 @@ import java.nio.ByteBuffer import spark.Logging import spark.TaskState.TaskState import spark.util.AkkaUtils -import akka.actor.{ActorRef, Actor, Props} +import akka.actor.{ActorRef, Actor, Props, Terminated} +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import akka.remote.RemoteClientLifeCycleEvent import spark.scheduler.cluster._ -import spark.scheduler.cluster.RegisteredSlave +import spark.scheduler.cluster.RegisteredExecutor import spark.scheduler.cluster.LaunchTask -import spark.scheduler.cluster.RegisterSlaveFailed -import spark.scheduler.cluster.RegisterSlave - +import spark.scheduler.cluster.RegisterExecutorFailed +import spark.scheduler.cluster.RegisterExecutor private[spark] class StandaloneExecutorBackend( executor: Executor, - masterUrl: String, - slaveId: String, + driverUrl: String, + executorId: String, hostname: String, cores: Int) extends Actor with ExecutorBackend with Logging { - val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - - var master: ActorRef = null + var driver: ActorRef = null override def preStart() { - try { - logInfo("Connecting to master: " + masterUrl) - master = context.actorFor(masterUrl) - master ! RegisterSlave(slaveId, hostname, cores) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing - } catch { - case e: Exception => - logError("Failed to connect to master", e) - System.exit(1) - } + logInfo("Connecting to driver: " + driverUrl) + driver = context.actorFor(driverUrl) + driver ! RegisterExecutor(executorId, hostname, cores) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(driver) // Doesn't work with remote actors, but useful for testing } override def receive = { - case RegisteredSlave(sparkProperties) => - logInfo("Successfully registered with master") - executor.initialize(hostname, sparkProperties) + case RegisteredExecutor(sparkProperties) => + logInfo("Successfully registered with driver") + executor.initialize(executorId, hostname, sparkProperties) - case RegisterSlaveFailed(message) => + case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) System.exit(1) case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logError("Driver terminated or disconnected! Shutting down.") + System.exit(1) } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { - master ! StatusUpdate(slaveId, taskId, state, data) + driver ! StatusUpdate(executorId, taskId, state, data) } } private[spark] object StandaloneExecutorBackend { - def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) { + def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { // Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)), + Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } def main(args: Array[String]) { if (args.length != 4) { - System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>") + System.err.println("Usage: StandaloneExecutorBackend <driverUrl> <executorId> <hostname> <cores>") System.exit(1) } run(args(0), args(1), args(2), args(3).toInt) |