diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-28 12:38:09 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-28 12:38:09 -0600 |
commit | 6415c2bb6046b080a040ca9e3f3015079712cb5e (patch) | |
tree | 6efb870e601b1f537d6ceb8f525bc95d13864911 /core | |
parent | 80eecd2cb14fa0c8017693241fa903bd09f46597 (diff) | |
download | spark-6415c2bb6046b080a040ca9e3f3015079712cb5e.tar.gz spark-6415c2bb6046b080a040ca9e3f3015079712cb5e.tar.bz2 spark-6415c2bb6046b080a040ca9e3f3015079712cb5e.zip |
Don't create the Executor until we have everything it needs.
Diffstat (limited to 'core')
3 files changed, 57 insertions, 53 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 663dec2615..b1d1d30283 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -16,11 +16,8 @@ import java.nio.ByteBuffer /** * The Mesos executor for Spark. */ -private[spark] class Executor extends Logging { - @volatile private var urlClassLoader : ExecutorURLClassLoader = null - @volatile private var threadPool: ExecutorService = null - @volatile private var env: SparkEnv = null - +private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging { + // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() @@ -30,52 +27,50 @@ private[spark] class Executor extends Logging { initLogging() - def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { - // Make sure the local hostname we report matches the cluster scheduler's name for this host - Utils.setCustomHostname(slaveHostname) + // Make sure the local hostname we report matches the cluster scheduler's name for this host + Utils.setCustomHostname(slaveHostname) - // Set spark.* system properties from executor arg - for ((key, value) <- properties) { - System.setProperty(key, value) - } + // Set spark.* system properties from executor arg + for ((key, value) <- properties) { + System.setProperty(key, value) + } + + // Create our ClassLoader and set it on this thread + private val urlClassLoader = createClassLoader() + Thread.currentThread.setContextClassLoader(urlClassLoader) - // Create our ClassLoader and set it on this thread - urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) - - // Make any thread terminations due to uncaught exceptions kill the entire - // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } + // Make any thread terminations due to uncaught exceptions kill the entire + // executor process to avoid surprising stalls. + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } - ) + } + ) - // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) - SparkEnv.set(env) + // Initialize Spark environment (using system properties read above) + val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) + SparkEnv.set(env) - // Start worker thread pool - threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - } + // Start worker thread pool + val threadPool = new ThreadPoolExecutor( + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index 818d6d1dda..10f3531df0 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -8,11 +8,12 @@ import com.google.protobuf.ByteString import spark.{Utils, Logging} import spark.TaskState -private[spark] class MesosExecutorBackend(executor: Executor) +private[spark] class MesosExecutorBackend extends MesosExecutor with ExecutorBackend with Logging { + var executor: Executor = null var driver: ExecutorDriver = null override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { @@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize( + executor = new Executor( executorInfo.getExecutorId.getValue, slaveInfo.getHostname, - properties - ) + properties) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { val taskId = taskInfo.getTaskId.getValue.toLong - executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + if (executor == null) { + logError("Received launchTask but executor was null") + } else { + executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + } } override def error(d: ExecutorDriver, message: String) { @@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { MesosNativeLibrary.load() // Create a new Executor and start it running - val runner = new MesosExecutorBackend(new Executor) + val runner = new MesosExecutorBackend() new MesosExecutorDriver(runner).run() } } diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 9a82c3054c..1047f71c6a 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor private[spark] class StandaloneExecutorBackend( - executor: Executor, driverUrl: String, executorId: String, hostname: String, @@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend( with ExecutorBackend with Logging { + var executor: Executor = null var driver: ActorRef = null override def preStart() { @@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend( override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") - executor.initialize(executorId, hostname, sparkProperties) + executor = new Executor(executorId, hostname, sparkProperties) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend( case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + if (executor == null) { + logError("Received launchTask but executor was null") + System.exit(1) + } else { + executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") @@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend { // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)), + Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } |