aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-03-11 15:39:21 -0800
committerDavies Liu <davies.liu@gmail.com>2016-03-11 15:39:21 -0800
commit2ef4c5963bff3574fe17e669d703b25ddd064e5d (patch)
treeaa3eeb5eefec3d663bc6c994d4a9be41fd998223
parent66d9d0edfef986895490bcdeacbc0ca38e091702 (diff)
downloadspark-2ef4c5963bff3574fe17e669d703b25ddd064e5d.tar.gz
spark-2ef4c5963bff3574fe17e669d703b25ddd064e5d.tar.bz2
spark-2ef4c5963bff3574fe17e669d703b25ddd064e5d.zip
[SPARK-13830] prefer block manager than direct result for large result
## What changes were proposed in this pull request? The current RPC can't handle large blocks very well, it's very slow to fetch 100M block (about 1 minute). Once switch to block manager to fetch that, it took about 10 seconds (still could be improved). ## How was this patch tested? existing unit tests. Author: Davies Liu <davies@databricks.com> Closes #11659 from davies/direct_result.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index e88d6cd089..07e3c12bc9 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -97,9 +97,11 @@ private[spark] class Executor(
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
- // Max RPC message size. If task result is bigger than this, we use the block manager
+ // Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
- private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ private val maxDirectResultSize = Math.min(
+ conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
+ RpcUtils.maxMessageSizeBytes(conf))
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -279,6 +281,7 @@ private[spark] class Executor(
// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
+ // TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
@@ -290,7 +293,7 @@ private[spark] class Executor(
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
- } else if (resultSize >= maxRpcMessageSize) {
+ } else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)