diff options
-rw-r--r-- | core/src/main/scala/spark/executor/Executor.scala | 4 |
1 files changed, 4 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 344face5e6..718f0ff5bc 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -72,6 +72,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env) + private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size") // Start worker thread pool val threadPool = new ThreadPoolExecutor( @@ -113,6 +114,9 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null)) val serializedResult = ser.serialize(result) logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit) + if (serializedResult.limit >= (akkaFrameSize - 1024)) { + throw new SparkException("Result for " + taskId + " exceeded Akka frame size") + } context.statusUpdate(taskId, TaskState.FINISHED, serializedResult) logInfo("Finished task ID " + taskId) } catch { |