diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2014-07-01 01:56:51 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-07-01 01:56:51 -0700 |
commit | 05c3d90e3527114d3abc08c7cc87f6efef96ebdc (patch) | |
tree | 89759c6eae35d767ade0f75c7a363b112bb49aa7 | |
parent | 3319a3e3c604f187ff8176597b269af04ca5c1c5 (diff) | |
download | spark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.tar.gz spark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.tar.bz2 spark-05c3d90e3527114d3abc08c7cc87f6efef96ebdc.zip |
[SPARK-2185] Emit warning when task size exceeds a threshold.
This functionality was added in an earlier commit but shortly
after was removed due to a bad git merge (totally my fault).
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes #1149 from kayousterhout/warning_bug and squashes the following commits:
3f1bb00 [Kay Ousterhout] Fixed Json tests
462a664 [Kay Ousterhout] Removed task set name from warning message
e89b2f6 [Kay Ousterhout] Fixed Json tests.
7af424c [Kay Ousterhout] Emit warning when task size exceeds a threshold.
7 files changed, 65 insertions, 36 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 813a9abfaf..81c136d970 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -623,16 +623,6 @@ class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { - for (stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage)) { - if (taskInfo.serializedSize > DAGScheduler.TASK_SIZE_TO_WARN * 1024 && - !stageInfo.emittedTaskSizeWarning) { - stageInfo.emittedTaskSizeWarning = true - logWarning(("Stage %d (%s) contains a task of very large " + - "size (%d KB). The maximum recommended task size is %d KB.").format( - task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, - DAGScheduler.TASK_SIZE_TO_WARN)) - } - } listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo)) submitWaitingStages() } @@ -1254,7 +1244,4 @@ private[spark] object DAGScheduler { // The time, in millis, to wake up between polls of the completion queue in order to potentially // resubmit failed stages val POLL_TIMEOUT = 10L - - // Warns the user if a stage contains a task with size greater than this value (in KB) - val TASK_SIZE_TO_WARN = 100 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala index 7644e3f351..480891550e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala @@ -38,8 +38,6 @@ class StageInfo( /** If the stage failed, the reason why. */ var failureReason: Option[String] = None - var emittedTaskSizeWarning = false - def stageFailed(reason: String) { failureReason = Some(reason) completionTime = Some(System.currentTimeMillis) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala index 6aecdfe8e6..29de0453ac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala @@ -49,8 +49,6 @@ class TaskInfo( var failed = false - var serializedSize: Int = 0 - private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { gettingResultTime = time } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 83ff6b8550..059cc9085a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -166,6 +166,8 @@ private[spark] class TaskSetManager( override def schedulingMode = SchedulingMode.NONE + var emittedTaskSizeWarning = false + /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. @@ -418,6 +420,13 @@ private[spark] class TaskSetManager( // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && + !emittedTaskSizeWarning) { + emittedTaskSizeWarning = true + logWarning(s"Stage ${task.stageId} contains a task of very large size " + + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") + } val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( @@ -764,3 +773,9 @@ private[spark] class TaskSetManager( localityWaits = myLocalityLevels.map(getLocalityWait) } } + +private[spark] object TaskSetManager { + // The user will be warned if any stages contain a task that has a serialized size greater than + // this. + val TASK_SIZE_TO_WARN_KB = 100 +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 26c9c9d603..47eb44b530 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -190,8 +190,7 @@ private[spark] object JsonProtocol { ("Details" -> stageInfo.details) ~ ("Submission Time" -> submissionTime) ~ ("Completion Time" -> completionTime) ~ - ("Failure Reason" -> failureReason) ~ - ("Emitted Task Size Warning" -> stageInfo.emittedTaskSizeWarning) + ("Failure Reason" -> failureReason) } def taskInfoToJson(taskInfo: TaskInfo): JValue = { @@ -205,8 +204,7 @@ private[spark] object JsonProtocol { ("Speculative" -> taskInfo.speculative) ~ ("Getting Result Time" -> taskInfo.gettingResultTime) ~ ("Finish Time" -> taskInfo.finishTime) ~ - ("Failed" -> taskInfo.failed) ~ - ("Serialized Size" -> taskInfo.serializedSize) + ("Failed" -> taskInfo.failed) } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -487,13 +485,11 @@ private[spark] object JsonProtocol { val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long]) val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long]) val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String]) - val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean] val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details) stageInfo.submissionTime = submissionTime stageInfo.completionTime = completionTime stageInfo.failureReason = failureReason - stageInfo.emittedTaskSizeWarning = emittedTaskSizeWarning stageInfo } @@ -509,14 +505,12 @@ private[spark] object JsonProtocol { val gettingResultTime = (json \ "Getting Result Time").extract[Long] val finishTime = (json \ "Finish Time").extract[Long] val failed = (json \ "Failed").extract[Boolean] - val serializedSize = (json \ "Serialized Size").extract[Int] val taskInfo = new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative) taskInfo.gettingResultTime = gettingResultTime taskInfo.finishTime = finishTime taskInfo.failed = failed - taskInfo.serializedSize = serializedSize taskInfo } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 59a618956a..9ff2a48700 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.Random + import scala.collection.mutable.ArrayBuffer import scala.collection.mutable @@ -83,6 +85,18 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex } } +/** + * A Task implementation that results in a large serialized task. + */ +class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) { + val randomBuffer = new Array[Byte](TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) + val random = new Random(0) + random.nextBytes(randomBuffer) + + override def runTask(context: TaskContext): Array[Byte] = randomBuffer + override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]() +} + class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} @@ -434,6 +448,33 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY))) } + test("do not emit warning when serialized task is small") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + val taskSet = FakeTask.createTaskSet(1) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(!manager.emittedTaskSizeWarning) + } + + test("emit warning when serialized task is large") { + sc = new SparkContext("local", "test") + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) + + val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null) + val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) + + assert(!manager.emittedTaskSizeWarning) + + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) + + assert(manager.emittedTaskSizeWarning) + } + def createTaskResult(id: Int): DirectTaskResult[Int] = { val valueSer = SparkEnv.get.serializer.newInstance() new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 316e14100e..058d314530 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -257,7 +257,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.numTasks === info2.numTasks) assert(info1.submissionTime === info2.submissionTime) assert(info1.completionTime === info2.completionTime) - assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning) assert(info1.rddInfos.size === info2.rddInfos.size) (0 until info1.rddInfos.size).foreach { i => assertEquals(info1.rddInfos(i), info2.rddInfos(i)) @@ -294,7 +293,6 @@ class JsonProtocolSuite extends FunSuite { assert(info1.gettingResultTime === info2.gettingResultTime) assert(info1.finishTime === info2.finishTime) assert(info1.failed === info2.failed) - assert(info1.serializedSize === info2.serializedSize) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -540,9 +538,8 @@ class JsonProtocolSuite extends FunSuite { private val stageSubmittedJsonString = """ {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name": - "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details", - "Emitted Task Size Warning":false},"Properties":{"France":"Paris","Germany":"Berlin", - "Russia":"Moscow","Ukraine":"Kiev"}} + "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties": + {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}} """ private val stageCompletedJsonString = @@ -551,8 +548,7 @@ class JsonProtocolSuite extends FunSuite { "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true, "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301, - "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details", - "Emitted Task Size Warning":false}} + "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}} """ private val taskStartJsonString = @@ -560,7 +556,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222, |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir", |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0, - |"Failed":false,"Serialized Size":0}} + |"Failed":false}} """.stripMargin private val taskGettingResultJsonString = @@ -568,7 +564,7 @@ class JsonProtocolSuite extends FunSuite { |{"Event":"SparkListenerTaskGettingResult","Task Info": | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0, - | "Finish Time":0,"Failed":false,"Serialized Size":0 + | "Finish Time":0,"Failed":false | } |} """.stripMargin @@ -580,7 +576,7 @@ class JsonProtocolSuite extends FunSuite { |"Task Info":{ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + | "Getting Result Time":0,"Finish Time":0,"Failed":false |}, |"Task Metrics":{ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, @@ -620,7 +616,7 @@ class JsonProtocolSuite extends FunSuite { |"Task Info":{ | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor", | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false, - | "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0 + | "Getting Result Time":0,"Finish Time":0,"Failed":false |}, |"Task Metrics":{ | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400, |