aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala32
2 files changed, 19 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 8b38366e03..02d54bf3b5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}
- override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
- val executorId = blockManagerAdded.blockManagerId.executorId
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
+ val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}
- override def onBlockManagerRemoved(
- blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
- allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
+ override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
+ allocationManager.onExecutorRemoved(executorRemoved.executorId)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 57081ddd95..c2869628af 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.scheduler.cluster.ExecutorInfo
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsPending(manager) === 4)
assert(addExecutors(manager) === 1)
@@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)
// New executors have registered
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(removeTimes(manager).contains("executor-1"))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-2", "host2", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-2", new ExecutorInfo("host2", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 2)
assert(removeTimes(manager).contains("executor-2"))
// Existing executors have disconnected
- sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
- 0L, BlockManagerId("executor-1", "host1", 1)))
+ sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
assert(executorIds(manager).size === 1)
assert(!executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(!removeTimes(manager).contains("executor-1"))
// Unknown executor has disconnected
- sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
- 0L, BlockManagerId("executor-3", "host3", 1)))
+ sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
assert(executorIds(manager).size === 1)
assert(removeTimes(manager).size === 1)
}
@@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
@@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-2", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-2", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 1)