diff options
author | Eric Liang <ekl@databricks.com> | 2016-09-08 16:47:18 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-09-08 16:47:18 -0700 |
commit | 722afbb2b33037a30d385a15725f2db5365bd375 (patch) | |
tree | 4295f62212e010653b7d5bd82f62149ca5f7a43b | |
parent | 78d5d4dd5ce5a537ed04cd1bf242c9e9ea2c391a (diff) | |
download | spark-722afbb2b33037a30d385a15725f2db5365bd375.tar.gz spark-722afbb2b33037a30d385a15725f2db5365bd375.tar.bz2 spark-722afbb2b33037a30d385a15725f2db5365bd375.zip |
[SPARK-17405] RowBasedKeyValueBatch should use default page size to prevent OOMs
## What changes were proposed in this pull request?
Before this change, we would always allocate 64MB per aggregation task for the first-level hash map storage, even when running in low-memory situations such as local mode. This changes it to use the memory manager default page size, which is automatically reduced from 64MB in these situations.
cc ooq JoshRosen
## How was this patch tested?
Tested manually with `bin/spark-shell --master=local[32]` and verifying that `(1 to math.pow(10, 3).toInt).toDF("n").withColumn("m", 'n % 2).groupBy('m).agg(sum('n)).show` does not crash.
Author: Eric Liang <ekl@databricks.com>
Closes #15016 from ericl/sc-4483.
-rw-r--r-- | sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java index 4899f856c8..551443a112 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/RowBasedKeyValueBatch.java @@ -37,19 +37,18 @@ import org.slf4j.LoggerFactory; * We use `FixedLengthRowBasedKeyValueBatch` if all fields in the key and the value are fixed-length * data types. Otherwise we use `VariableLengthRowBasedKeyValueBatch`. * - * RowBasedKeyValueBatch is backed by a single page / MemoryBlock (defaults to 64MB). If the page - * is full, the aggregate logic should fallback to a second level, larger hash map. We intentionally - * use the single-page design because it simplifies memory address encoding & decoding for each - * key-value pair. Because the maximum capacity for RowBasedKeyValueBatch is only 2^16, it is - * unlikely we need a second page anyway. Filling the page requires an average size for key value - * pairs to be larger than 1024 bytes. + * RowBasedKeyValueBatch is backed by a single page / MemoryBlock (ranges from 1 to 64MB depending + * on the system configuration). If the page is full, the aggregate logic should fallback to a + * second level, larger hash map. We intentionally use the single-page design because it simplifies + * memory address encoding & decoding for each key-value pair. Because the maximum capacity for + * RowBasedKeyValueBatch is only 2^16, it is unlikely we need a second page anyway. Filling the + * page requires an average size for key value pairs to be larger than 1024 bytes. * */ public abstract class RowBasedKeyValueBatch extends MemoryConsumer { protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class); private static final int DEFAULT_CAPACITY = 1 << 16; - private static final long DEFAULT_PAGE_SIZE = 64 * 1024 * 1024; protected final StructType keySchema; protected final StructType valueSchema; @@ -105,7 +104,7 @@ public abstract class RowBasedKeyValueBatch extends MemoryConsumer { this.keyRow = new UnsafeRow(keySchema.length()); this.valueRow = new UnsafeRow(valueSchema.length()); - if (!acquirePage(DEFAULT_PAGE_SIZE)) { + if (!acquirePage(manager.pageSizeBytes())) { page = null; recordStartOffset = 0; } else { |