aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-05-16 01:52:40 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-05-16 01:57:57 -0700
commitb8e46b6074e5ecc1ae4ed22ea32983597c14b683 (patch)
tree254ffeaf4f4f949076b626d1ac43a5b59075e4eb /core
parent440719109e10ea1cc6149a8f61d42ea7cc443352 (diff)
downloadspark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.tar.gz
spark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.tar.bz2
spark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.zip
Abort job if result exceeds Akka frame size; add test.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala2
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala13
4 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 420c54bc9a..c5da453562 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
+
+private[spark] case class TaskResultTooBigFailure() extends TaskEndReason \ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 718f0ff5bc..9ec4eb6e88 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
if (serializedResult.limit >= (akkaFrameSize - 1024)) {
- throw new SparkException("Result for " + taskId + " exceeded Akka frame size")
+ context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
+ return
}
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 27e713e2c4..df7f0eafff 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
sched.taskSetFinished(this)
return
+ case taskResultTooBig: TaskResultTooBigFailure =>
+ logInfo("Loss was due to task %s result exceeding Akka frame size;" +
+ "aborting job".format(tid))
+ abort("Task %s result exceeded Akka frame size".format(tid))
+ return
+
case ef: ExceptionFailure =>
val key = ef.exception.toString
val now = System.currentTimeMillis
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4df3bb5b67..9f58999cbe 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
}
+
+ test("job should fail if TaskResult exceeds Akka frame size") {
+ // We must use local-cluster mode since results are returned differently
+ // when running under LocalScheduler:
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+ val exception = intercept[SparkException] {
+ rdd.reduce((x, y) => x)
+ }
+ exception.getMessage should endWith("result exceeded Akka frame size")
+ }
}
object DistributedSuite {