aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2014-07-01 01:56:51 -0700
committerReynold Xin <rxin@apache.org>2014-07-01 01:56:51 -0700
commit05c3d90e3527114d3abc08c7cc87f6efef96ebdc (patch)
tree89759c6eae35d767ade0f75c7a363b112bb49aa7 /core/src/test
parent3319a3e3c604f187ff8176597b269af04ca5c1c5 (diff)
downloadspark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.tar.gz
spark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.tar.bz2
spark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.zip
[SPARK-2185] Emit warning when task size exceeds a threshold.
This functionality was added in an earlier commit but shortly after was removed due to a bad git merge (totally my fault). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1149 from kayousterhout/warning_bug and squashes the following commits: 3f1bb00 [Kay Ousterhout] Fixed Json tests 462a664 [Kay Ousterhout] Removed task set name from warning message e89b2f6 [Kay Ousterhout] Fixed Json tests. 7af424c [Kay Ousterhout] Emit warning when task size exceeds a threshold.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala18
2 files changed, 48 insertions, 11 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 59a618956a..9ff2a48700 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.scheduler
+import java.util.Random
+
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable
@@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
}
}
+/**
+ * A Task implementation that results in a large serialized task.
+ */
+class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
+ val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024)
+ val random = new Random(0)
+ random.nextBytes(randomBuffer)
+
+ override def runTask(context: TaskContext): Array[Byte] = randomBuffer
+ override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()
+}
+
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
@@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
}
+ test("do not emit warning when serialized task is small") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+ val taskSet = FakeTask.createTaskSet(1)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+ assert(!manager.emittedTaskSizeWarning)
+
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
+
+ assert(!manager.emittedTaskSizeWarning)
+ }
+
+ test("emit warning when serialized task is large") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
+
+ val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+ assert(!manager.emittedTaskSizeWarning)
+
+ assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
+
+ assert(manager.emittedTaskSizeWarning)
+ }
+
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 316e14100e..058d314530 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -257,7 +257,6 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.numTasks === info2.numTasks)
assert(info1.submissionTime === info2.submissionTime)
assert(info1.completionTime === info2.completionTime)
- assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning)
assert(info1.rddInfos.size === info2.rddInfos.size)
(0 until info1.rddInfos.size).foreach { i =>
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
@@ -294,7 +293,6 @@ class JsonProtocolSuite extends FunSuite {
assert(info1.gettingResultTime === info2.gettingResultTime)
assert(info1.finishTime === info2.finishTime)
assert(info1.failed === info2.failed)
- assert(info1.serializedSize === info2.serializedSize)
}
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -540,9 +538,8 @@ class JsonProtocolSuite extends FunSuite {
private val stageSubmittedJsonString =
"""
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
- "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
- "Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin",
- "Russia":"Moscow","Ukraine":"Kiev"}}
+ "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
+ {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
"""
private val stageCompletedJsonString =
@@ -551,8 +548,7 @@ class JsonProtocolSuite extends FunSuite {
"greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
- "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
- "Emitted Task Size Warning":false}}
+ "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}}
"""
private val taskStartJsonString =
@@ -560,7 +556,7 @@ class JsonProtocolSuite extends FunSuite {
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
- |"Failed":false,"Serialized Size":0}}
+ |"Failed":false}}
""".stripMargin
private val taskGettingResultJsonString =
@@ -568,7 +564,7 @@ class JsonProtocolSuite extends FunSuite {
|{"Event":"SparkListenerTaskGettingResult","Task Info":
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
- | "Finish Time":0,"Failed":false,"Serialized Size":0
+ | "Finish Time":0,"Failed":false
| }
|}
""".stripMargin
@@ -580,7 +576,7 @@ class JsonProtocolSuite extends FunSuite {
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -620,7 +616,7 @@ class JsonProtocolSuite extends FunSuite {
|"Task Info":{
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+ | "Getting Result Time":0,"Finish Time":0,"Failed":false
|},
|"Task Metrics":{
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,