aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-29 16:00:30 -0700
committerReynold Xin <rxin@databricks.com>2015-07-29 16:00:30 -0700
commit1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536 (patch)
tree900a7c0fb8296e20d5d10914ab63d6a61805a6da /sql/catalyst
parentb715933fc69a49653abdb2fba0818dfc4f35d358 (diff)
downloadspark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.gz
spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.bz2
spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.zip
[SPARK-9411] [SQL] Make Tungsten page sizes configurable
We need to make page sizes configurable so we can reduce them in unit tests and increase them in real production workloads. These sizes are now controlled by a new configuration, `spark.buffer.pageSize`. The new default is 64 megabytes. Author: Josh Rosen <joshrosen@databricks.com> Closes #7741 from JoshRosen/SPARK-9411 and squashes the following commits: a43c4db [Josh Rosen] Fix pow 2c0eefc [Josh Rosen] Fix MAXIMUM_PAGE_SIZE_BYTES comment + value bccfb51 [Josh Rosen] Lower page size to 4MB in TestHive ba54d4b [Josh Rosen] Make UnsafeExternalSorter's page size configurable 0045aa2 [Josh Rosen] Make UnsafeShuffle's page size configurable bc734f0 [Josh Rosen] Rename configuration e614858 [Josh Rosen] Makes BytesToBytesMap page size configurable
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java5
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala6
2 files changed, 9 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
index 684de6e81d..03f4c3ed8e 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java
@@ -95,6 +95,7 @@ public final class UnsafeFixedWidthAggregationMap {
* @param groupingKeySchema the schema of the grouping key, used for row conversion.
* @param memoryManager the memory manager used to allocate our Unsafe memory structures.
* @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing).
+ * @param pageSizeBytes the data page size, in bytes; limits the maximum record size.
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
public UnsafeFixedWidthAggregationMap(
@@ -103,11 +104,13 @@ public final class UnsafeFixedWidthAggregationMap {
StructType groupingKeySchema,
TaskMemoryManager memoryManager,
int initialCapacity,
+ long pageSizeBytes,
boolean enablePerfMetrics) {
this.aggregationBufferSchema = aggregationBufferSchema;
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
- this.map = new BytesToBytesMap(memoryManager, initialCapacity, enablePerfMetrics);
+ this.map =
+ new BytesToBytesMap(memoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics);
this.enablePerfMetrics = enablePerfMetrics;
// Initialize the buffer for aggregation value
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
index 48b7dc5745..6a907290f2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala
@@ -39,6 +39,7 @@ class UnsafeFixedWidthAggregationMapSuite
private val groupKeySchema = StructType(StructField("product", StringType) :: Nil)
private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil)
private def emptyAggregationBuffer: InternalRow = InternalRow(0)
+ private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes
private var memoryManager: TaskMemoryManager = null
@@ -69,7 +70,8 @@ class UnsafeFixedWidthAggregationMapSuite
aggBufferSchema,
groupKeySchema,
memoryManager,
- 1024, // initial capacity
+ 1024, // initial capacity,
+ PAGE_SIZE_BYTES,
false // disable perf metrics
)
assert(!map.iterator().hasNext)
@@ -83,6 +85,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
memoryManager,
1024, // initial capacity
+ PAGE_SIZE_BYTES,
false // disable perf metrics
)
val groupKey = InternalRow(UTF8String.fromString("cats"))
@@ -109,6 +112,7 @@ class UnsafeFixedWidthAggregationMapSuite
groupKeySchema,
memoryManager,
128, // initial capacity
+ PAGE_SIZE_BYTES,
false // disable perf metrics
)
val rand = new Random(42)