aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-01-28 11:02:51 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-01-28 11:02:51 -0800
commit0b35fcd7f01044e86669bac93e9663277c86365b (patch)
tree1d943041309338f49d2ef3c6602e42ae7da11681 /core
parenteeb53bf90e93b298eff48387d2e9ad699b52d001 (diff)
downloadspark-0b35fcd7f01044e86669bac93e9663277c86365b.tar.gz
spark-0b35fcd7f01044e86669bac93e9663277c86365b.tar.bz2
spark-0b35fcd7f01044e86669bac93e9663277c86365b.zip
[SPARK-5291][CORE] Add timestamp and reason why an executor is removed to SparkListenerExecutorAdded and SparkListenerExecutorRemoved
Recently `SparkListenerExecutorAdded` and `SparkListenerExecutorRemoved` are added. I think it's useful if they have timestamp and the reason why an executor is removed. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #4082 from sarutak/SPARK-5291 and squashes the following commits: a026ff2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291 979dfe1 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291 cf9f9080 [Kousuke Saruta] Fixed test case 1f2a89b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291 243f2a60 [Kousuke Saruta] Modified MesosSchedulerBackendSuite a527c35 [Kousuke Saruta] Added timestamp to SparkListenerExecutorAdded
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala16
6 files changed, 33 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index e5d1eb767e..8f5ceaa5de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -91,11 +91,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerExecutorRemoved(executorId: String)
+case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
extends SparkListenerEvent
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5786d36746..103a5c053c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -108,7 +108,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
- listenerBus.post(SparkListenerExecutorAdded(executorId, data))
+ listenerBus.post(
+ SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
@@ -216,7 +217,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
- listenerBus.post(SparkListenerExecutorRemoved(executorId))
+ listenerBus.post(
+ SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 79c9051e88..c3c546be6d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -269,7 +269,7 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
- listenerBus.post(SparkListenerExecutorAdded(slaveId,
+ listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
new ExecutorInfo(o.host, o.cores)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
@@ -327,7 +327,7 @@ private[spark] class MesosSchedulerBackend(
synchronized {
if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
// We lost the executor on this slave, so remember that it's gone
- removeExecutor(taskIdToSlaveId(tid))
+ removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}
if (isFinished(status.getState)) {
taskIdToSlaveId.remove(tid)
@@ -359,9 +359,9 @@ private[spark] class MesosSchedulerBackend(
/**
* Remove executor associated with slaveId in a thread safe manner.
*/
- private def removeExecutor(slaveId: String) = {
+ private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
- listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+ listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
slaveIdsWithExecutors -= slaveId
}
}
@@ -369,7 +369,7 @@ private[spark] class MesosSchedulerBackend(
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
inClassLoader() {
logInfo("Mesos slave lost: " + slaveId.getValue)
- removeExecutor(slaveId.getValue)
+ removeExecutor(slaveId.getValue, reason.toString)
scheduler.executorLost(slaveId.getValue, reason)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f896b5072e..b5f736dc41 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -204,13 +204,16 @@ private[spark] object JsonProtocol {
def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+ ("Timestamp" -> executorAdded.time) ~
("Executor ID" -> executorAdded.executorId) ~
("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
}
def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
- ("Executor ID" -> executorRemoved.executorId)
+ ("Timestamp" -> executorRemoved.time) ~
+ ("Executor ID" -> executorRemoved.executorId) ~
+ ("Removed Reason" -> executorRemoved.reason)
}
/** ------------------------------------------------------------------- *
@@ -554,14 +557,17 @@ private[spark] object JsonProtocol {
}
def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+ val time = (json \ "Timestamp").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
val executorInfo = executorInfoFromJson(json \ "Executor Info")
- SparkListenerExecutorAdded(executorId, executorInfo)
+ SparkListenerExecutorAdded(time, executorId, executorInfo)
}
def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+ val time = (json \ "Timestamp").extract[Long]
val executorId = (json \ "Executor ID").extract[String]
- SparkListenerExecutorRemoved(executorId)
+ val reason = (json \ "Removed Reason").extract[String]
+ SparkListenerExecutorRemoved(time, executorId, reason)
}
/** --------------------------------------------------------------------- *
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 073814c127..f2ff98eb72 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
- listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+ listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
EasyMock.replay(listenerBus)
val sc = EasyMock.createMock(classOf[SparkContext])
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0357fc6ce2..6577ebaa2e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -37,6 +37,9 @@ class JsonProtocolSuite extends FunSuite {
val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L
+ val executorAddedTime = 1421458410000L
+ val executorRemovedTime = 1421458922000L
+
test("SparkListenerEvent") {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -73,9 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
- val executorAdded = SparkListenerExecutorAdded("exec1",
+ val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11))
- val executorRemoved = SparkListenerExecutorRemoved("exec2")
+ val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -1453,9 +1456,10 @@ class JsonProtocolSuite extends FunSuite {
"""
private val executorAddedJsonString =
- """
+ s"""
|{
| "Event": "SparkListenerExecutorAdded",
+ | "Timestamp": ${executorAddedTime},
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
@@ -1465,10 +1469,12 @@ class JsonProtocolSuite extends FunSuite {
"""
private val executorRemovedJsonString =
- """
+ s"""
|{
| "Event": "SparkListenerExecutorRemoved",
- | "Executor ID": "exec2"
+ | "Timestamp": ${executorRemovedTime},
+ | "Executor ID": "exec2",
+ | "Removed Reason": "test reason"
|}
"""
}