diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-05-16 01:52:40 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2013-05-16 01:57:57 -0700 |
commit | b8e46b6074e5ecc1ae4ed22ea32983597c14b683 (patch) | |
tree | 254ffeaf4f4f949076b626d1ac43a5b59075e4eb /core | |
parent | 440719109e10ea1cc6149a8f61d42ea7cc443352 (diff) | |
download | spark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.tar.gz spark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.tar.bz2 spark-b8e46b6074e5ecc1ae4ed22ea32983597c14b683.zip |
Abort job if result exceeds Akka frame size; add test.
Diffstat (limited to 'core')
4 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 420c54bc9a..c5da453562 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -20,3 +20,5 @@ case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, re private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason + +private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
\ No newline at end of file diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 718f0ff5bc..9ec4eb6e88 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -115,7 +115,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val serializedResult = ser.serialize(result) logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) if (serializedResult.limit >= (akkaFrameSize - 1024)) { - throw new SparkException("Result for " + taskId + " exceeded Akka frame size") + context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure())) + return } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 27e713e2c4..df7f0eafff 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -492,6 +492,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe sched.taskSetFinished(this) return + case taskResultTooBig: TaskResultTooBigFailure => + logInfo("Loss was due to task %s result exceeding Akka frame size;" + + "aborting job".format(tid)) + abort("Task %s result exceeded Akka frame size".format(tid)) + return + case ef: ExceptionFailure => val key = ef.exception.toString val now = System.currentTimeMillis diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 4df3bb5b67..9f58999cbe 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -277,6 +277,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter } } } + + test("job should fail if TaskResult exceeds Akka frame size") { + // We must use local-cluster mode since results are returned differently + // when running under LocalScheduler: + sc = new SparkContext("local-cluster[1,1,512]", "test") + val akkaFrameSize = + sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt + val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)} + val exception = intercept[SparkException] { + rdd.reduce((x, y) => x) + } + exception.getMessage should endWith("result exceeded Akka frame size") + } } object DistributedSuite { |