aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-07-14 12:47:11 -0700
committerAndrew Or <andrew@databricks.com>2015-07-14 12:47:11 -0700
commit8fb3a65cbb714120d612e58ef9d12b0521a83260 (patch)
tree9e5163879d14f661b8320c59b8f3d17ef1965bb0 /core
parentc4e98ff066cc6f0839d15140eb471d74a0d83e91 (diff)
downloadspark-8fb3a65cbb714120d612e58ef9d12b0521a83260.tar.gz
spark-8fb3a65cbb714120d612e58ef9d12b0521a83260.tar.bz2
spark-8fb3a65cbb714120d612e58ef9d12b0521a83260.zip
[SPARK-8911] Fix local mode endless heartbeats
As of #7173 we expect executors to properly register with the driver before responding to their heartbeats. This behavior is not matched in local mode. This patch adds the missing event that needs to be posted. Author: Andrew Or <andrew@databricks.com> Closes #7382 from andrewor14/fix-local-heartbeat and squashes the following commits: 1258bdf [Andrew Or] Post ExecutorAdded event to local executor
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala20
1 files changed, 13 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 776e5d330e..4d48fcfea4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
-import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
private case class ReviveOffers()
@@ -50,8 +51,8 @@ private[spark] class LocalEndpoint(
private var freeCores = totalCores
- private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
- private val localExecutorHostname = "localhost"
+ val localExecutorId = SparkContext.DRIVER_IDENTIFIER
+ val localExecutorHostname = "localhost"
private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
@@ -99,8 +100,9 @@ private[spark] class LocalBackend(
extends SchedulerBackend with ExecutorBackend with Logging {
private val appId = "local-" + System.currentTimeMillis
- var localEndpoint: RpcEndpointRef = null
+ private var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)
+ private val listenerBus = scheduler.sc.listenerBus
/**
* Returns a list of URLs representing the user classpath.
@@ -113,9 +115,13 @@ private[spark] class LocalBackend(
}
override def start() {
- localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
- "LocalBackendEndpoint",
- new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
+ val rpcEnv = SparkEnv.get.rpcEnv
+ val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
+ localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
+ listenerBus.post(SparkListenerExecutorAdded(
+ System.currentTimeMillis,
+ executorEndpoint.localExecutorId,
+ new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
}
override def stop() {