diff options
author | Andrew Or <andrew@databricks.com> | 2015-07-14 12:47:11 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-07-14 12:47:11 -0700 |
commit | 8fb3a65cbb714120d612e58ef9d12b0521a83260 (patch) | |
tree | 9e5163879d14f661b8320c59b8f3d17ef1965bb0 /core | |
parent | c4e98ff066cc6f0839d15140eb471d74a0d83e91 (diff) | |
download | spark-8fb3a65cbb714120d612e58ef9d12b0521a83260.tar.gz spark-8fb3a65cbb714120d612e58ef9d12b0521a83260.tar.bz2 spark-8fb3a65cbb714120d612e58ef9d12b0521a83260.zip |
[SPARK-8911] Fix local mode endless heartbeats
As of #7173 we expect executors to properly register with the driver before responding to their heartbeats. This behavior is not matched in local mode. This patch adds the missing event that needs to be posted.
Author: Andrew Or <andrew@databricks.com>
Closes #7382 from andrewor14/fix-local-heartbeat and squashes the following commits:
1258bdf [Andrew Or] Post ExecutorAdded event to local executor
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 776e5d330e..4d48fcfea4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} -import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo private case class ReviveOffers() @@ -50,8 +51,8 @@ private[spark] class LocalEndpoint( private var freeCores = totalCores - private val localExecutorId = SparkContext.DRIVER_IDENTIFIER - private val localExecutorHostname = "localhost" + val localExecutorId = SparkContext.DRIVER_IDENTIFIER + val localExecutorHostname = "localhost" private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) @@ -99,8 +100,9 @@ private[spark] class LocalBackend( extends SchedulerBackend with ExecutorBackend with Logging { private val appId = "local-" + System.currentTimeMillis - var localEndpoint: RpcEndpointRef = null + private var localEndpoint: RpcEndpointRef = null private val userClassPath = getUserClasspath(conf) + private val listenerBus = scheduler.sc.listenerBus /** * Returns a list of URLs representing the user classpath. @@ -113,9 +115,13 @@ private[spark] class LocalBackend( } override def start() { - localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint( - "LocalBackendEndpoint", - new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores)) + val rpcEnv = SparkEnv.get.rpcEnv + val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores) + localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint) + listenerBus.post(SparkListenerExecutorAdded( + System.currentTimeMillis, + executorEndpoint.localExecutorId, + new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty))) } override def stop() { |