diff options
Diffstat (limited to 'core/src/test')
4 files changed, 153 insertions, 38 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 4f178db40f..7f5d0b061e 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.{BeforeAndAfter, FunSuite} import org.scalatest.mock.EasyMockSugar +import org.apache.spark.executor.{DataReadMethod, TaskMetrics} import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar test("get cached rdd") { expecting { - blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator)) + val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12) + blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result)) } whenExecuting(blockManager) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6df0a08096..71f48e295e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0l) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { + taskMetrics.inputMetrics should not be ('defined) taskMetrics.shuffleWriteMetrics should be ('defined) taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d7dbe5164b..23cb6905bf 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._ import org.scalatest.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf} +import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.executor.DataReadMethod import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils} +import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.language.postfixOps @@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter } } + test("correct BlockResult returned from get() calls") { + store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr, + mapOutputTracker) + val list1 = List(new Array[Byte](200), new Array[Byte](200)) + val list1ForSizeEstimate = new ArrayBuffer[Any] + list1ForSizeEstimate ++= list1.iterator + val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate) + val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150)) + val list2ForSizeEstimate = new ArrayBuffer[Any] + list2ForSizeEstimate ++= list2.iterator + val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate) + store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val list1Get = store.get("list1") + assert(list1Get.isDefined, "list1 expected to be in store") + assert(list1Get.get.data.size === 2) + assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate) + assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2MemoryGet = store.get("list2memory") + assert(list2MemoryGet.isDefined, "list2memory expected to be in store") + assert(list2MemoryGet.get.data.size === 3) + assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate) + assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory) + val list2DiskGet = store.get("list2disk") + assert(list2DiskGet.isDefined, "list2memory expected to be in store") + assert(list2DiskGet.get.data.size === 3) + System.out.println(list2DiskGet) + // We don't know the exact size of the data on disk, but it should certainly be > 0. + assert(list2DiskGet.get.inputMetrics.bytesRead > 0) + assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk) + } + test("in-memory LRU storage") { store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr, mapOutputTracker) @@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3").isDefined, "list3 was not in store") - assert(store.get("list3").get.size == 2) + assert(store.get("list3").get.data.size === 2) assert(store.get("list1") === None, "list1 was in store") assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) // At this point list2 was gotten last, so LRU will getSingle rid of list3 store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) assert(store.get("list1").isDefined, "list1 was not in store") - assert(store.get("list1").get.size == 2) + assert(store.get("list1").get.data.size === 2) assert(store.get("list2").isDefined, "list2 was not in store") - assert(store.get("list2").get.size == 2) + assert(store.get("list2").get.data.size === 2) assert(store.get("list3") === None, "list1 was in store") } @@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true) store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true) + val listForSizeEstimate = new ArrayBuffer[Any] + listForSizeEstimate ++= list1.iterator + val listSize = SizeEstimator.estimate(listForSizeEstimate) // At this point LRU should not kick in because list3 is only on disk - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) - assert(store.get("list1").isDefined, "list2 was not in store") - assert(store.get("list1").get.size === 2) - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) + assert(store.get("list1").isDefined, "list1 was not in store") + assert(store.get("list1").get.data.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) // Now let's add in list4, which uses both disk and memory; list1 should drop out store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true) assert(store.get("list1") === None, "list1 was in store") - assert(store.get("list2").isDefined, "list3 was not in store") - assert(store.get("list2").get.size === 2) - assert(store.get("list3").isDefined, "list1 was not in store") - assert(store.get("list3").get.size === 2) + assert(store.get("list2").isDefined, "list2 was not in store") + assert(store.get("list2").get.data.size === 2) + assert(store.get("list3").isDefined, "list3 was not in store") + assert(store.get("list3").get.data.size === 2) assert(store.get("list4").isDefined, "list4 was not in store") - assert(store.get("list4").get.size === 2) + assert(store.get("list4").get.data.size === 2) } test("negative byte values in ByteBufferInputStream") { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6c49870455..316e14100e 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite { val taskGettingResult = SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true)) val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, - makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800)) + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false)) + val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success, + makeTaskInfo(123L, 234, 67, 345L, false), + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true)) val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( @@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite { testEvent(taskStart, taskStartJsonString) testEvent(taskGettingResult, taskGettingResultJsonString) testEvent(taskEnd, taskEndJsonString) + testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString) testEvent(jobStart, jobStartJsonString) testEvent(jobEnd, jobEndJsonString) testEvent(environmentUpdate, environmentUpdateJsonString) @@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite { testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) - testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8)) + testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000)) // StorageLevel @@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite { testBlockId(StreamBlockId(1, 2L)) } - test("Backward compatibility") { + test("StageInfo.details backward compatibility") { // StageInfo.details was added after 1.0.0. val info = makeStageInfo(1, 2, 3, 4L, 5L) assert(info.details.nonEmpty) @@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite { assert("" === newInfo.details) } + test("InputMetrics backward compatibility") { + // InputMetrics were added after 1.0.1. + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true) + assert(metrics.inputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.isEmpty) + } + /** -------------------------- * | Helper test running methods | @@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite { metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) assertOptionEquals( metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) + assertOptionEquals( + metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals) } @@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite { assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime) } + private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { + assert(metrics1.readMethod === metrics2.readMethod) + assert(metrics1.bytesRead === metrics2.bytesRead) + } + private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) { assert(bm1.executorId === bm2.executorId) assert(bm1.host === bm2.host) @@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite { assertEquals(w1, w2) } + private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { + assertEquals(i1, i2) + } + private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { assertEquals(t1, t2) } @@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite { new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) } - private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = { + /** + * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is + * set to true) or read data from a shuffle otherwise. + */ + private def makeTaskMetrics( + a: Long, + b: Long, + c: Long, + d: Long, + e: Int, + f: Int, + hasHadoopInput: Boolean) = { val t = new TaskMetrics - val sr = new ShuffleReadMetrics val sw = new ShuffleWriteMetrics t.hostname = "localhost" t.executorDeserializeTime = a @@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite { t.jvmGCTime = d t.resultSerializationTime = a + b t.memoryBytesSpilled = a + c - sr.shuffleFinishTime = b + c - sr.totalBlocksFetched = e + f - sr.remoteBytesRead = b + d - sr.localBlocksFetched = e - sr.fetchWaitTime = a + d - sr.remoteBlocksFetched = f + + if (hasHadoopInput) { + val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) + inputMetrics.bytesRead = d + e + f + t.inputMetrics = Some(inputMetrics) + } else { + val sr = new ShuffleReadMetrics + sr.shuffleFinishTime = b + c + sr.totalBlocksFetched = e + f + sr.remoteBytesRead = b + d + sr.localBlocksFetched = e + sr.fetchWaitTime = a + d + sr.remoteBlocksFetched = f + t.shuffleReadMetrics = Some(sr) + } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d - t.shuffleReadMetrics = Some(sr) t.shuffleWriteMetrics = Some(sw) // Make at most 6 blocks t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i => @@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite { | }, | "Shuffle Write Metrics":{ | "Shuffle Bytes Written":1200, - | "Shuffle Write Time":1500}, - | "Updated Blocks":[ + | "Shuffle Write Time":1500 + | }, + | "Updated Blocks":[ | {"Block ID":"rdd_0_0", | "Status":{ | "Storage Level":{ @@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite { |} """.stripMargin + private val taskEndWithHadoopInputJsonString = + """ + |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask", + |"Task End Reason":{"Reason":"Success"}, + |"Task Info":{ + | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", + | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, + | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + |}, + |"Task Metrics":{ + | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, + | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700, + | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0, + | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500}, + | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100}, + | "Updated Blocks":[ + | {"Block ID":"rdd_0_0", + | "Status":{ + | "Storage Level":{ + | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, + | "Replication":2 + | }, + | "Memory Size":0,"Tachyon Size":0,"Disk Size":0 + | } + | } + | ]} + |} + """ + private val jobStartJsonString = """ {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties": |