diff options
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala | 41 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 18 |
2 files changed, 48 insertions, 11 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 59a618956a..9ff2a48700 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Random + import scala.collection.mutable.ArrayBuffer import scala.collection.mutable @@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex } } +/** + * A Task implementation that results in a large serialized task. + */ +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { + val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) + val random = new Random(0) + random.nextBytes(randomBuffer) + + override def runTask(context: TaskContext): Array[Byte] = randomBuffer + override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() +} + class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} @@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) } + test("do not emit warning when serialized task is small") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(1) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(!manager.emittedTaskSizeWarning) + } + + test("emit warning when serialized task is large") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(manager.emittedTaskSizeWarning) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 316e14100e..058d314530 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -257,7 +257,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.numTasks === info2.numTasks) assert(info1.submissionTime === info2.submissionTime) assert(info1.completionTime === info2.completionTime) - assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning) assert(info1.rddInfos.size === info2.rddInfos.size) (0 until info1.rddInfos.size).foreach { i => assertEquals(info1.rddInfos(i), info2.rddInfos(i)) @@ -294,7 +293,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) - assert(info1.serializedSize === info2.serializedSize) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -540,9 +538,8 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": - "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details", - "Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin", - "Russia":"Moscow","Ukraine":"Kiev"}} + "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties": + {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = @@ -551,8 +548,7 @@ class JsonProtocolSuite extends FunSuite { "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, - "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details", - "Emitted Task Size Warning":false}} + "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}} """ private val taskStartJsonString = @@ -560,7 +556,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, - |"Failed":false,"Serialized Size":0}} + |"Failed":false}} """.stripMargin private val taskGettingResultJsonString = @@ -568,7 +564,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskGettingResult","Task Info": | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, - | "Finish Time":0,"Failed":false,"Serialized Size":0 + | "Finish Time":0,"Failed":false | } |} """.stripMargin @@ -580,7 +576,7 @@ class JsonProtocolSuite extends FunSuite { |"Task Info":{ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + | "Getting Result Time":0,"Finish Time":0,"Failed":false |}, |"Task Metrics":{ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, @@ -620,7 +616,7 @@ class JsonProtocolSuite extends FunSuite { |"Task Info":{ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + | "Getting Result Time":0,"Finish Time":0,"Failed":false |}, |"Task Metrics":{ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, |