diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2014-09-03 14:47:11 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-03 14:47:11 -0700 |
commit | ccc69e26ec2fadd90886990b90a5a600efd08aba (patch) | |
tree | 5400262391d91ae2d09f600913ca19932c2164dd /core/src/test/scala | |
parent | e5d376801d57dffb0791980a1786a0a9b45bc491 (diff) | |
download | spark-ccc69e26ec2fadd90886990b90a5a600efd08aba.tar.gz spark-ccc69e26ec2fadd90886990b90a5a600efd08aba.tar.bz2 spark-ccc69e26ec2fadd90886990b90a5a600efd08aba.zip |
[SPARK-2845] Add timestamps to block manager events.
These are not used by the UI but are useful when analysing the
logs from a spark job.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #654 from vanzin/bm-event-tstamp and squashes the following commits:
d5d6e66 [Marcelo Vanzin] Fix tests.
ec06218 [Marcelo Vanzin] Review feedback.
f134dbc [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
b495b7c [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
7d2fe9e [Marcelo Vanzin] Review feedback.
d6f381c [Marcelo Vanzin] Update tests added after patch was created.
45e3bf8 [Marcelo Vanzin] Fix unit test after merge.
b37a10f [Marcelo Vanzin] Use === in test assertions.
ef72824 [Marcelo Vanzin] Handle backwards compatibility with 1.0.0.
aca1151 [Marcelo Vanzin] Fix unit test to check new fields.
efdda8e [Marcelo Vanzin] Add timestamps to block manager events.
Diffstat (limited to 'core/src/test/scala')
3 files changed, 44 insertions, 15 deletions
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala index 4e022a69c8..3a45875391 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala @@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite { // Block manager add assert(listener.executorIdToStorageStatus.size === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) assert(listener.executorIdToStorageStatus.size === 1) assert(listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1) assert(listener.executorIdToStorageStatus("big").maxMem === 1000L) assert(listener.executorIdToStorageStatus("big").numBlocks === 0) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) assert(listener.executorIdToStorageStatus.size === 2) assert(listener.executorIdToStorageStatus.get("fat").isDefined) assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2) @@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite { assert(listener.executorIdToStorageStatus("fat").numBlocks === 0) // Block manager remove - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1)) assert(listener.executorIdToStorageStatus.size === 1) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(listener.executorIdToStorageStatus.get("fat").isDefined) - listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2)) + listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2)) assert(listener.executorIdToStorageStatus.size === 0) assert(!listener.executorIdToStorageStatus.get("big").isDefined) assert(!listener.executorIdToStorageStatus.get("fat").isDefined) @@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end without updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics = new TaskMetrics // Task end with no updated blocks @@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite { test("task end with updated blocks") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) @@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite { test("unpersist RDD") { val listener = new StorageStatusListener - listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L)) + listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) val taskMetrics1 = new TaskMetrics val taskMetrics2 = new TaskMetrics val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L)) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index d9e9c70a8a..e1bc1379b5 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val myRddInfo1 = rddInfo1 val myRddInfo2 = rddInfo2 val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details") - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener._rddInfoMap.size === 3) assert(storageListener.rddInfoList.size === 0) // not cached @@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L)) taskMetrics0.updatedBlocks = Some(Seq(block0)) taskMetrics1.updatedBlocks = Some(Seq(block1)) - bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L)) + bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L)) bus.postToAll(SparkListenerStageSubmitted(stageInfo0)) assert(storageListener.rddInfoList.size === 0) bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0)) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 66a17de9ec..c84bafce37 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -21,6 +21,9 @@ import java.util.Properties import scala.collection.Map +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.JsonAST._ import org.json4s.jackson.JsonMethods._ import org.scalatest.FunSuite @@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite { "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")), "Classpath Entries" -> Seq(("Super library", "/tmp/super_library")) )) - val blockManagerAdded = SparkListenerBlockManagerAdded( + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, BlockManagerId("Stars", "In your multitude...", 300), 500) - val blockManagerRemoved = SparkListenerBlockManagerRemoved( + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, BlockManagerId("Scarce", "to be counted...", 100)) val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield") @@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("BlockManager events backward compatibility") { + // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property. + val blockManagerAdded = SparkListenerBlockManagerAdded(1L, + BlockManagerId("Stars", "In your multitude...", 300), 500) + val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L, + BlockManagerId("Scarce", "to be counted...", 100)) + + val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded) + assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId, + blockManagerAdded.maxMem) === deserializedBmAdded) + + val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved) + .removeField({ _._1 == "Timestamp" }) + + val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved) + assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) === + deserializedBmRemoved) + } + /** -------------------------- * | Helper test running methods | @@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(e1.environmentDetails, e2.environmentDetails) case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) => assert(e1.maxMem === e2.maxMem) + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) => + assert(e1.time === e2.time) assertEquals(e1.blockManagerId, e2.blockManagerId) case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) => assert(e1.rddId == e2.rddId) @@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite { | "Host": "In your multitude...", | "Port": 300 | }, - | "Maximum Memory": 500 + | "Maximum Memory": 500, + | "Timestamp": 1 |} """ @@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite { | "Executor ID": "Scarce", | "Host": "to be counted...", | "Port": 100 - | } + | }, + | "Timestamp": 2 |} """ |