diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-23 07:19:34 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-03-23 07:19:34 -0700 |
commit | b8949cab889da4ba0f613a17b2eef52a32476410 (patch) | |
tree | 2399d439c78a482d8b71c4dbdc50822106713481 /core | |
parent | fd53f2fc7be5faa5d7b1aa58226f7f74f44247d2 (diff) | |
parent | 4f4215311a4bef65eb705798a0748d270371bee5 (diff) | |
download | spark-b8949cab889da4ba0f613a17b2eef52a32476410.tar.gz spark-b8949cab889da4ba0f613a17b2eef52a32476410.tar.bz2 spark-b8949cab889da4ba0f613a17b2eef52a32476410.zip |
Merge pull request #505 from stephenh/volatile
Make Executor fields volatile since they're read from the thread pool.
Diffstat (limited to 'core')
3 files changed, 60 insertions, 56 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 4474ef4593..3e7407b58d 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -16,66 +16,61 @@ import java.nio.ByteBuffer /** * The Mesos executor for Spark. */ -private[spark] class Executor extends Logging { - var urlClassLoader : ExecutorURLClassLoader = null - var threadPool: ExecutorService = null - var env: SparkEnv = null - +private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging { + // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. - val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() - val currentJars: HashMap[String, Long] = new HashMap[String, Long]() + private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() + private val currentJars: HashMap[String, Long] = new HashMap[String, Long]() - val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) + private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0)) initLogging() - def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) { - // Make sure the local hostname we report matches the cluster scheduler's name for this host - Utils.setCustomHostname(slaveHostname) + // Make sure the local hostname we report matches the cluster scheduler's name for this host + Utils.setCustomHostname(slaveHostname) - // Set spark.* system properties from executor arg - for ((key, value) <- properties) { - System.setProperty(key, value) - } + // Set spark.* system properties from executor arg + for ((key, value) <- properties) { + System.setProperty(key, value) + } + + // Create our ClassLoader and set it on this thread + private val urlClassLoader = createClassLoader() + Thread.currentThread.setContextClassLoader(urlClassLoader) - // Create our ClassLoader and set it on this thread - urlClassLoader = createClassLoader() - Thread.currentThread.setContextClassLoader(urlClassLoader) - - // Make any thread terminations due to uncaught exceptions kill the entire - // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } + // Make any thread terminations due to uncaught exceptions kill the entire + // executor process to avoid surprising stalls. + Thread.setDefaultUncaughtExceptionHandler( + new Thread.UncaughtExceptionHandler { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) } } - ) + } + ) - // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) - SparkEnv.set(env) + // Initialize Spark environment (using system properties read above) + val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) + SparkEnv.set(env) - // Start worker thread pool - threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - } + // Start worker thread pool + val threadPool = new ThreadPoolExecutor( + 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { threadPool.execute(new TaskRunner(context, taskId, serializedTask)) diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala index 818d6d1dda..10f3531df0 100644 --- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala @@ -8,11 +8,12 @@ import com.google.protobuf.ByteString import spark.{Utils, Logging} import spark.TaskState -private[spark] class MesosExecutorBackend(executor: Executor) +private[spark] class MesosExecutorBackend extends MesosExecutor with ExecutorBackend with Logging { + var executor: Executor = null var driver: ExecutorDriver = null override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { @@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor) logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) - executor.initialize( + executor = new Executor( executorInfo.getExecutorId.getValue, slaveInfo.getHostname, - properties - ) + properties) } override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { val taskId = taskInfo.getTaskId.getValue.toLong - executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + if (executor == null) { + logError("Received launchTask but executor was null") + } else { + executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer) + } } override def error(d: ExecutorDriver, message: String) { @@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend { def main(args: Array[String]) { MesosNativeLibrary.load() // Create a new Executor and start it running - val runner = new MesosExecutorBackend(new Executor) + val runner = new MesosExecutorBackend() new MesosExecutorDriver(runner).run() } } diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index 9a82c3054c..1047f71c6a 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed import spark.scheduler.cluster.RegisterExecutor private[spark] class StandaloneExecutorBackend( - executor: Executor, driverUrl: String, executorId: String, hostname: String, @@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend( with ExecutorBackend with Logging { + var executor: Executor = null var driver: ActorRef = null override def preStart() { @@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend( override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver") - executor.initialize(executorId, hostname, sparkProperties) + executor = new Executor(executorId, hostname, sparkProperties) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend( case LaunchTask(taskDesc) => logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + if (executor == null) { + logError("Received launchTask but executor was null") + System.exit(1) + } else { + executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) + } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") @@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend { // before getting started with all our system properties, etc val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0) val actor = actorSystem.actorOf( - Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)), + Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)), name = "Executor") actorSystem.awaitTermination() } |