From 6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Fri, 6 Feb 2015 11:09:37 -0800 Subject: [SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener More strictly, in ExecutorAllocationListener, we need to replace onBlockManagerAdded, onBlockManagerRemoved with onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and onExecutorRemoved are more accurate to express these meanings. example at SPARK-5529, BlockManager has been removed,but executor is existed. andrewor14 sryza Author: lianhuiwang Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits: 333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener --- .../apache/spark/ExecutorAllocationManager.scala | 9 +++--- .../spark/ExecutorAllocationManagerSuite.scala | 32 ++++++++++------------ 2 files changed, 19 insertions(+), 22 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 8b38366e03..02d54bf3b5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager( } } - override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { - val executorId = blockManagerAdded.blockManagerId.executorId + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + val executorId = executorAdded.executorId if (executorId != SparkContext.DRIVER_IDENTIFIER) { // This guards against the race condition in which the `SparkListenerTaskStart` // event is posted before the `SparkListenerBlockManagerAdded` event, which is @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager( } } - override def onBlockManagerRemoved( - blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { - allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId) + override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + allocationManager.onExecutorRemoved(executorRemoved.executorId) } /** diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 57081ddd95..c2869628af 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ -import org.apache.spark.storage.BlockManagerId +import org.apache.spark.scheduler.cluster.ExecutorInfo /** * Test add and remove behavior of ExecutorAllocationManager. @@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { // Verify that running a task reduces the cap sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsPending(manager) === 4) assert(addExecutors(manager) === 1) @@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-2", "host2", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-2", new ExecutorInfo("host2", 1))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 2) assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( - 0L, BlockManagerId("executor-1", "host1", 1))) + sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved( - 0L, BlockManagerId("executor-3", "host3", 1))) + sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).isEmpty) sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) @@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-1", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-1", new ExecutorInfo("host1", 1))) sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerBlockManagerAdded( - 0L, BlockManagerId("executor-2", "host1", 1), 100L)) + sc.listenerBus.postToAll(SparkListenerExecutorAdded( + 0L, "executor-2", new ExecutorInfo("host1", 1))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) assert(removeTimes(manager).size === 1) -- cgit v1.2.3