aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-09-03 14:47:11 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-03 14:47:11 -0700
commitccc69e26ec2fadd90886990b90a5a600efd08aba (patch)
tree5400262391d91ae2d09f600913ca19932c2164dd /core
parente5d376801d57dffb0791980a1786a0a9b45bc491 (diff)
downloadspark-ccc69e26ec2fadd90886990b90a5a600efd08aba.tar.gz
spark-ccc69e26ec2fadd90886990b90a5a600efd08aba.tar.bz2
spark-ccc69e26ec2fadd90886990b90a5a600efd08aba.zip
[SPARK-2845] Add timestamps to block manager events.
These are not used by the UI but are useful when analysing the logs from a spark job. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #654 from vanzin/bm-event-tstamp and squashes the following commits: d5d6e66 [Marcelo Vanzin] Fix tests. ec06218 [Marcelo Vanzin] Review feedback. f134dbc [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp b495b7c [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp 7d2fe9e [Marcelo Vanzin] Review feedback. d6f381c [Marcelo Vanzin] Update tests added after patch was created. 45e3bf8 [Marcelo Vanzin] Fix unit test after merge. b37a10f [Marcelo Vanzin] Use === in test assertions. ef72824 [Marcelo Vanzin] Handle backwards compatibility with 1.0.0. aca1151 [Marcelo Vanzin] Fix unit test to check new fields. efdda8e [Marcelo Vanzin] Add timestamps to block manager events.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala18
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala37
6 files changed, 58 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 86ca8445a1..f33c2e065a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab07703b6..1a6c7cb24f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockLocations.remove(blockId)
}
}
- listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+ listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
}
private def expireDeadHosts() {
@@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
@@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
id.hostPort, Utils.bytesToString(maxMemSize)))
blockManagerInfo(id) =
- new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+ new BlockManagerInfo(id, time, maxMemSize, slaveActor)
}
- listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
+ listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
private def updateBlockInfo(
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a7543454ec..1fc536b096 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Block Manager ID" -> blockManagerId) ~
- ("Maximum Memory" -> blockManagerAdded.maxMem)
+ ("Maximum Memory" -> blockManagerAdded.maxMem) ~
+ ("Timestamp" -> blockManagerAdded.time)
}
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
- ("Block Manager ID" -> blockManagerId)
+ ("Block Manager ID" -> blockManagerId) ~
+ ("Timestamp" -> blockManagerRemoved.time)
}
def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
@@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
- SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- SparkListenerBlockManagerRemoved(blockManagerId)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerRemoved(time, blockManagerId)
}
def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 4e022a69c8..3a45875391 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite {
// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
assert(listener.executorIdToStorageStatus.size === 1)
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
@@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite {
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
// Block manager remove
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
@@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite {
test("task end without updated blocks") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
// Task end with no updated blocks
@@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite {
test("task end with updated blocks") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
@@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite {
test("unpersist RDD") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index d9e9c70a8a..e1bc1379b5 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
- bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0) // not cached
@@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
- bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 66a17de9ec..c84bafce37 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,6 +21,9 @@ import java.util.Properties
import scala.collection.Map
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite
@@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite {
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
- val blockManagerAdded = SparkListenerBlockManagerAdded(
+ val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
- val blockManagerRemoved = SparkListenerBlockManagerRemoved(
+ val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
@@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}
+ test("BlockManager events backward compatibility") {
+ // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
+ val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
+ BlockManagerId("Stars", "In your multitude...", 300), 500)
+ val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
+ BlockManagerId("Scarce", "to be counted...", 100))
+
+ val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
+ .removeField({ _._1 == "Timestamp" })
+
+ val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
+ assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
+ blockManagerAdded.maxMem) === deserializedBmAdded)
+
+ val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
+ .removeField({ _._1 == "Timestamp" })
+
+ val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
+ assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
+ deserializedBmRemoved)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) =>
assert(e1.maxMem === e2.maxMem)
+ assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
+ assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
assert(e1.rddId == e2.rddId)
@@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite {
| "Host": "In your multitude...",
| "Port": 300
| },
- | "Maximum Memory": 500
+ | "Maximum Memory": 500,
+ | "Timestamp": 1
|}
"""
@@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "Scarce",
| "Host": "to be counted...",
| "Port": 100
- | }
+ | },
+ | "Timestamp": 2
|}
"""