aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala20
1 files changed, 13 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 776e5d330e..4d48fcfea4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -25,7 +25,8 @@ import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
-import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
private case class ReviveOffers()
@@ -50,8 +51,8 @@ private[spark] class LocalEndpoint(
private var freeCores = totalCores
- private val localExecutorId = SparkContext.DRIVER_IDENTIFIER
- private val localExecutorHostname = "localhost"
+ val localExecutorId = SparkContext.DRIVER_IDENTIFIER
+ val localExecutorHostname = "localhost"
private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
@@ -99,8 +100,9 @@ private[spark] class LocalBackend(
extends SchedulerBackend with ExecutorBackend with Logging {
private val appId = "local-" + System.currentTimeMillis
- var localEndpoint: RpcEndpointRef = null
+ private var localEndpoint: RpcEndpointRef = null
private val userClassPath = getUserClasspath(conf)
+ private val listenerBus = scheduler.sc.listenerBus
/**
* Returns a list of URLs representing the user classpath.
@@ -113,9 +115,13 @@ private[spark] class LocalBackend(
}
override def start() {
- localEndpoint = SparkEnv.get.rpcEnv.setupEndpoint(
- "LocalBackendEndpoint",
- new LocalEndpoint(SparkEnv.get.rpcEnv, userClassPath, scheduler, this, totalCores))
+ val rpcEnv = SparkEnv.get.rpcEnv
+ val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
+ localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
+ listenerBus.post(SparkListenerExecutorAdded(
+ System.currentTimeMillis,
+ executorEndpoint.localExecutorId,
+ new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
}
override def stop() {