aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2013-05-17 19:58:40 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2013-05-17 19:58:40 -0700
commit5912cc49676821c3f1599f81dcf714b040e04a1a (patch)
tree74f69d1ff55ed2c7492a2671a20f72247eb991e1 /core
parent8d78c5f89f25d013c997c03587193f3d87a268b0 (diff)
parentb8e46b6074e5ecc1ae4ed22ea32983597c14b683 (diff)
downloadspark-5912cc49676821c3f1599f81dcf714b040e04a1a.tar.gz
spark-5912cc49676821c3f1599f81dcf714b040e04a1a.tar.bz2
spark-5912cc49676821c3f1599f81dcf714b040e04a1a.zip
Merge pull request #610 from JoshRosen/spark-747
Throw exception if TaskResult exceeds Akka frame size
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala2
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala13
4 files changed, 26 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index ca793eb402..8140cba084 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -28,3 +28,5 @@ private[spark] case class ExceptionFailure(
extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
+
+private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index da20b84544..890938d48b 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
+ private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
@@ -113,6 +114,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
+ if (serializedResult.limit >= (akkaFrameSize - 1024)) {
+ context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
+ return
+ }
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index c69f3bdb7f..db5869db63 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -542,6 +542,12 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
sched.taskSetFinished(this)
return
+ case taskResultTooBig: TaskResultTooBigFailure =>
+ logInfo("Loss was due to task %s result exceeding Akka frame size;" +
+ "aborting job".format(tid))
+ abort("Task %s result exceeded Akka frame size".format(tid))
+ return
+
case ef: ExceptionFailure =>
val key = ef.description
val now = System.currentTimeMillis
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 06a94ed24c..068bb6ca4f 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -301,6 +301,19 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
}
+
+ test("job should fail if TaskResult exceeds Akka frame size") {
+ // We must use local-cluster mode since results are returned differently
+ // when running under LocalScheduler:
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+ val exception = intercept[SparkException] {
+ rdd.reduce((x, y) => x)
+ }
+ exception.getMessage should endWith("result exceeded Akka frame size")
+ }
}
object DistributedSuite {