aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-05-11 18:27:26 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-05-11 19:17:13 -0700
commit440719109e10ea1cc6149a8f61d42ea7cc443352 (patch)
treea978197cdd2f82c135824100d9566d60ac9cb81d /core
parent63e1999f6057bd397b49efe432ad74c0015a101b (diff)
downloadspark-440719109e10ea1cc6149a8f61d42ea7cc443352.tar.gz
spark-440719109e10ea1cc6149a8f61d42ea7cc443352.tar.bz2
spark-440719109e10ea1cc6149a8f61d42ea7cc443352.zip
Throw exception if task result exceeds Akka frame size.
This partially addresses SPARK-747.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala4
1 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 344face5e6..718f0ff5bc 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
+ private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
@@ -113,6 +114,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
+ if (serializedResult.limit >= (akkaFrameSize - 1024)) {
+ throw new SparkException("Result for " + taskId + " exceeded Akka frame size")
+ }
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
} catch {