aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorlianhuiwang <lianhuiwang09@gmail.com>2015-02-06 11:09:37 -0800
committerAndrew Or <andrew@databricks.com>2015-02-06 11:09:37 -0800
commit6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60 (patch)
tree0d4fb387262a25b1c17381c65bfc63b803bc392d /core
parent9792bec596113a6f5f4534772b7539255403b082 (diff)
downloadspark-6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60.tar.gz
spark-6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60.tar.bz2
spark-6072fcc14ee1a4eba793e725fcb2cb2ffebd5b60.zip
[SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener
More strictly, in ExecutorAllocationListener, we need to replace onBlockManagerAdded, onBlockManagerRemoved with onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and onExecutorRemoved are more accurate to express these meanings. example at SPARK-5529, BlockManager has been removed,but executor is existed. andrewor14 sryza Author: lianhuiwang <lianhuiwang09@gmail.com> Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits: 333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala32
2 files changed, 19 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 8b38366e03..02d54bf3b5 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}
- override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
- val executorId = blockManagerAdded.blockManagerId.executorId
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
+ val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
@@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}
- override def onBlockManagerRemoved(
- blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
- allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
+ override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
+ allocationManager.onExecutorRemoved(executorRemoved.executorId)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 57081ddd95..c2869628af 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
-import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.scheduler.cluster.ExecutorInfo
/**
* Test add and remove behavior of ExecutorAllocationManager.
@@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsPending(manager) === 4)
assert(addExecutors(manager) === 1)
@@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)
// New executors have registered
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(removeTimes(manager).contains("executor-1"))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-2", "host2", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-2", new ExecutorInfo("host2", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 2)
assert(removeTimes(manager).contains("executor-2"))
// Existing executors have disconnected
- sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
- 0L, BlockManagerId("executor-1", "host1", 1)))
+ sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
assert(executorIds(manager).size === 1)
assert(!executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(!removeTimes(manager).contains("executor-1"))
// Unknown executor has disconnected
- sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
- 0L, BlockManagerId("executor-3", "host3", 1)))
+ sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
assert(executorIds(manager).size === 1)
assert(removeTimes(manager).size === 1)
}
@@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
@@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-1", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
- sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
- 0L, BlockManagerId("executor-2", "host1", 1), 100L))
+ sc.listenerBus.postToAll(SparkListenerExecutorAdded(
+ 0L, "executor-2", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 1)