aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala12
3 files changed, 14 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 86ca8445a1..f33c2e065a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab07703b6..1a6c7cb24f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockLocations.remove(blockId)
}
}
- listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+ listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
}
private def expireDeadHosts() {
@@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
@@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
id.hostPort, Utils.bytesToString(maxMemSize)))
blockManagerInfo(id) =
- new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+ new BlockManagerInfo(id, time, maxMemSize, slaveActor)
}
- listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
+ listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
private def updateBlockInfo(
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a7543454ec..1fc536b096 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Block Manager ID" -> blockManagerId) ~
- ("Maximum Memory" -> blockManagerAdded.maxMem)
+ ("Maximum Memory" -> blockManagerAdded.maxMem) ~
+ ("Timestamp" -> blockManagerAdded.time)
}
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
- ("Block Manager ID" -> blockManagerId)
+ ("Block Manager ID" -> blockManagerId) ~
+ ("Timestamp" -> blockManagerRemoved.time)
}
def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
@@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
- SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- SparkListenerBlockManagerRemoved(blockManagerId)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerRemoved(time, blockManagerId)
}
def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {