diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2015-01-28 11:02:51 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-01-28 11:02:51 -0800 |
commit | 0b35fcd7f01044e86669bac93e9663277c86365b (patch) | |
tree | 1d943041309338f49d2ef3c6602e42ae7da11681 /core | |
parent | eeb53bf90e93b298eff48387d2e9ad699b52d001 (diff) | |
download | spark-0b35fcd7f01044e86669bac93e9663277c86365b.tar.gz spark-0b35fcd7f01044e86669bac93e9663277c86365b.tar.bz2 spark-0b35fcd7f01044e86669bac93e9663277c86365b.zip |
[SPARK-5291][CORE] Add timestamp and reason why an executor is removed to SparkListenerExecutorAdded and SparkListenerExecutorRemoved
Recently `SparkListenerExecutorAdded` and `SparkListenerExecutorRemoved` are added.
I think it's useful if they have timestamp and the reason why an executor is removed.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #4082 from sarutak/SPARK-5291 and squashes the following commits:
a026ff2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
979dfe1 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
cf9f9080 [Kousuke Saruta] Fixed test case
1f2a89b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
243f2a60 [Kousuke Saruta] Modified MesosSchedulerBackendSuite
a527c35 [Kousuke Saruta] Added timestamp to SparkListenerExecutorAdded
Diffstat (limited to 'core')
6 files changed, 33 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index e5d1eb767e..8f5ceaa5de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -91,11 +91,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent @DeveloperApi -case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo) +case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo) extends SparkListenerEvent @DeveloperApi -case class SparkListenerExecutorRemoved(executorId: String) +case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) extends SparkListenerEvent /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 5786d36746..103a5c053c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -108,7 +108,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } - listenerBus.post(SparkListenerExecutorAdded(executorId, data)) + listenerBus.post( + SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data)) makeOffers() } @@ -216,7 +217,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) scheduler.executorLost(executorId, SlaveLost(reason)) - listenerBus.post(SparkListenerExecutorRemoved(executorId)) + listenerBus.post( + SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason)) case None => logError(s"Asked to remove non-existent executor $executorId") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 79c9051e88..c3c546be6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -269,7 +269,7 @@ private[spark] class MesosSchedulerBackend( mesosTasks.foreach { case (slaveId, tasks) => slaveIdToWorkerOffer.get(slaveId).foreach(o => - listenerBus.post(SparkListenerExecutorAdded(slaveId, + listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, new ExecutorInfo(o.host, o.cores))) ) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) @@ -327,7 +327,7 @@ private[spark] class MesosSchedulerBackend( synchronized { if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) { // We lost the executor on this slave, so remember that it's gone - removeExecutor(taskIdToSlaveId(tid)) + removeExecutor(taskIdToSlaveId(tid), "Lost executor") } if (isFinished(status.getState)) { taskIdToSlaveId.remove(tid) @@ -359,9 +359,9 @@ private[spark] class MesosSchedulerBackend( /** * Remove executor associated with slaveId in a thread safe manner. */ - private def removeExecutor(slaveId: String) = { + private def removeExecutor(slaveId: String, reason: String) = { synchronized { - listenerBus.post(SparkListenerExecutorRemoved(slaveId)) + listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) slaveIdsWithExecutors -= slaveId } } @@ -369,7 +369,7 @@ private[spark] class MesosSchedulerBackend( private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) { inClassLoader() { logInfo("Mesos slave lost: " + slaveId.getValue) - removeExecutor(slaveId.getValue) + removeExecutor(slaveId.getValue, reason.toString) scheduler.executorLost(slaveId.getValue, reason) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f896b5072e..b5f736dc41 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -204,13 +204,16 @@ private[spark] object JsonProtocol { def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = { ("Event" -> Utils.getFormattedClassName(executorAdded)) ~ + ("Timestamp" -> executorAdded.time) ~ ("Executor ID" -> executorAdded.executorId) ~ ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo)) } def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = { ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~ - ("Executor ID" -> executorRemoved.executorId) + ("Timestamp" -> executorRemoved.time) ~ + ("Executor ID" -> executorRemoved.executorId) ~ + ("Removed Reason" -> executorRemoved.reason) } /** ------------------------------------------------------------------- * @@ -554,14 +557,17 @@ private[spark] object JsonProtocol { } def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = { + val time = (json \ "Timestamp").extract[Long] val executorId = (json \ "Executor ID").extract[String] val executorInfo = executorInfoFromJson(json \ "Executor Info") - SparkListenerExecutorAdded(executorId, executorInfo) + SparkListenerExecutorAdded(time, executorId, executorInfo) } def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = { + val time = (json \ "Timestamp").extract[Long] val executorId = (json \ "Executor ID").extract[String] - SparkListenerExecutorRemoved(executorId) + val reason = (json \ "Removed Reason").extract[String] + SparkListenerExecutorRemoved(time, executorId, reason) } /** --------------------------------------------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index 073814c127..f2ff98eb72 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea conf.set("spark.mesos.executor.home" , "/mesos-home") val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) @@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 0357fc6ce2..6577ebaa2e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -37,6 +37,9 @@ class JsonProtocolSuite extends FunSuite { val jobSubmissionTime = 1421191042750L val jobCompletionTime = 1421191296660L + val executorAddedTime = 1421458410000L + val executorRemovedTime = 1421458922000L + test("SparkListenerEvent") { val stageSubmitted = SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties) @@ -73,9 +76,9 @@ class JsonProtocolSuite extends FunSuite { val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) - val executorAdded = SparkListenerExecutorAdded("exec1", + val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", new ExecutorInfo("Hostee.awesome.com", 11)) - val executorRemoved = SparkListenerExecutorRemoved("exec2") + val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") testEvent(stageSubmitted, stageSubmittedJsonString) testEvent(stageCompleted, stageCompletedJsonString) @@ -1453,9 +1456,10 @@ class JsonProtocolSuite extends FunSuite { """ private val executorAddedJsonString = - """ + s""" |{ | "Event": "SparkListenerExecutorAdded", + | "Timestamp": ${executorAddedTime}, | "Executor ID": "exec1", | "Executor Info": { | "Host": "Hostee.awesome.com", @@ -1465,10 +1469,12 @@ class JsonProtocolSuite extends FunSuite { """ private val executorRemovedJsonString = - """ + s""" |{ | "Event": "SparkListenerExecutorRemoved", - | "Executor ID": "exec2" + | "Timestamp": ${executorRemovedTime}, + | "Executor ID": "exec2", + | "Removed Reason": "test reason" |} """ } |