aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala84
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala102
4 files changed, 153 insertions, 38 deletions
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db40f..7f5d0b061e 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
test("get cached rdd") {
expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+ val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
whenExecuting(blockManager) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a08096..71f48e295e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0l)
if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+ taskMetrics.inputMetrics should not be ('defined)
taskMetrics.shuffleWriteMetrics should be ('defined)
taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe5164b..23cb6905bf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
+import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.language.postfixOps
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
}
+ test("correct BlockResult returned from get() calls") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
+ mapOutputTracker)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list1ForSizeEstimate = new ArrayBuffer[Any]
+ list1ForSizeEstimate ++= list1.iterator
+ val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+ val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+ val list2ForSizeEstimate = new ArrayBuffer[Any]
+ list2ForSizeEstimate ++= list2.iterator
+ val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val list1Get = store.get("list1")
+ assert(list1Get.isDefined, "list1 expected to be in store")
+ assert(list1Get.get.data.size === 2)
+ assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+ assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2MemoryGet = store.get("list2memory")
+ assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+ assert(list2MemoryGet.get.data.size === 3)
+ assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+ assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+ val list2DiskGet = store.get("list2disk")
+ assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+ assert(list2DiskGet.get.data.size === 3)
+ System.out.println(list2DiskGet)
+ // We don't know the exact size of the data on disk, but it should certainly be > 0.
+ assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+ assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+ }
+
test("in-memory LRU storage") {
store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
- assert(store.get("list3").get.size == 2)
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
- assert(store.get("list1").get.size == 2)
+ assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list2").get.size == 2)
+ assert(store.get("list2").get.data.size === 2)
assert(store.get("list3") === None, "list1 was in store")
}
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ val listForSizeEstimate = new ArrayBuffer[Any]
+ listForSizeEstimate ++= list1.iterator
+ val listSize = SizeEstimator.estimate(listForSizeEstimate)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
- assert(store.get("list1").isDefined, "list2 was not in store")
- assert(store.get("list1").get.size === 2)
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
+ assert(store.get("list1").isDefined, "list1 was not in store")
+ assert(store.get("list1").get.data.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2").isDefined, "list3 was not in store")
- assert(store.get("list2").get.size === 2)
- assert(store.get("list3").isDefined, "list1 was not in store")
- assert(store.get("list3").get.size === 2)
+ assert(store.get("list2").isDefined, "list2 was not in store")
+ assert(store.get("list2").get.data.size === 2)
+ assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.get("list3").get.data.size === 2)
assert(store.get("list4").isDefined, "list4 was not in store")
- assert(store.get("list4").get.size === 2)
+ assert(store.get("list4").get.data.size === 2)
}
test("negative byte values in ByteBufferInputStream") {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c49870455..316e14100e 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
- makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+ val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ makeTaskInfo(123L, 234, 67, 345L, false),
+ makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
testEvent(taskStart, taskStartJsonString)
testEvent(taskGettingResult, taskGettingResultJsonString)
testEvent(taskEnd, taskEndJsonString)
+ testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
testEvent(jobStart, jobStartJsonString)
testEvent(jobEnd, jobEndJsonString)
testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
- testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+ testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
// StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
testBlockId(StreamBlockId(1, 2L))
}
- test("Backward compatibility") {
+ test("StageInfo.details backward compatibility") {
// StageInfo.details was added after 1.0.0.
val info = makeStageInfo(1, 2, 3, 4L, 5L)
assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
assert("" === newInfo.details)
}
+ test("InputMetrics backward compatibility") {
+ // InputMetrics were added after 1.0.1.
+ val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+ assert(metrics.inputMetrics.nonEmpty)
+ val newJson = JsonProtocol.taskMetricsToJson(metrics)
+ val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+ val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+ assert(newMetrics.inputMetrics.isEmpty)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
assertOptionEquals(
metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+ assertOptionEquals(
+ metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
}
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
}
+ private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+ assert(metrics1.readMethod === metrics2.readMethod)
+ assert(metrics1.bytesRead === metrics2.bytesRead)
+ }
+
private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
assert(bm1.executorId === bm2.executorId)
assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(w1, w2)
}
+ private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+ assertEquals(i1, i2)
+ }
+
private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
assertEquals(t1, t2)
}
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
}
- private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+ /**
+ * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+ * set to true) or read data from a shuffle otherwise.
+ */
+ private def makeTaskMetrics(
+ a: Long,
+ b: Long,
+ c: Long,
+ d: Long,
+ e: Int,
+ f: Int,
+ hasHadoopInput: Boolean) = {
val t = new TaskMetrics
- val sr = new ShuffleReadMetrics
val sw = new ShuffleWriteMetrics
t.hostname = "localhost"
t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
t.jvmGCTime = d
t.resultSerializationTime = a + b
t.memoryBytesSpilled = a + c
- sr.shuffleFinishTime = b + c
- sr.totalBlocksFetched = e + f
- sr.remoteBytesRead = b + d
- sr.localBlocksFetched = e
- sr.fetchWaitTime = a + d
- sr.remoteBlocksFetched = f
+
+ if (hasHadoopInput) {
+ val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+ inputMetrics.bytesRead = d + e + f
+ t.inputMetrics = Some(inputMetrics)
+ } else {
+ val sr = new ShuffleReadMetrics
+ sr.shuffleFinishTime = b + c
+ sr.totalBlocksFetched = e + f
+ sr.remoteBytesRead = b + d
+ sr.localBlocksFetched = e
+ sr.fetchWaitTime = a + d
+ sr.remoteBlocksFetched = f
+ t.shuffleReadMetrics = Some(sr)
+ }
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
- t.shuffleReadMetrics = Some(sr)
t.shuffleWriteMetrics = Some(sw)
// Make at most 6 blocks
t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
| },
| "Shuffle Write Metrics":{
| "Shuffle Bytes Written":1200,
- | "Shuffle Write Time":1500},
- | "Updated Blocks":[
+ | "Shuffle Write Time":1500
+ | },
+ | "Updated Blocks":[
| {"Block ID":"rdd_0_0",
| "Status":{
| "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
|}
""".stripMargin
+ private val taskEndWithHadoopInputJsonString =
+ """
+ |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+ |"Task End Reason":{"Reason":"Success"},
+ |"Task Info":{
+ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+ | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ |},
+ |"Task Metrics":{
+ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+ | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+ | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+ | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+ | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+ | "Updated Blocks":[
+ | {"Block ID":"rdd_0_0",
+ | "Status":{
+ | "Storage Level":{
+ | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+ | "Replication":2
+ | },
+ | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | }
+ | }
+ | ]}
+ |}
+ """
+
private val jobStartJsonString =
"""
{"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":