diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-07-29 16:00:30 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-29 16:00:30 -0700 |
commit | 1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536 (patch) | |
tree | 900a7c0fb8296e20d5d10914ab63d6a61805a6da /sql | |
parent | b715933fc69a49653abdb2fba0818dfc4f35d358 (diff) | |
download | spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.gz spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.bz2 spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.zip |
[SPARK-9411] [SQL] Make Tungsten page sizes configurable
We need to make page sizes configurable so we can reduce them in unit tests and increase them in real production workloads. These sizes are now controlled by a new configuration, `spark.buffer.pageSize`. The new default is 64 megabytes.
Author: Josh Rosen <joshrosen@databricks.com>
Closes #7741 from JoshRosen/SPARK-9411 and squashes the following commits:
a43c4db [Josh Rosen] Fix pow
2c0eefc [Josh Rosen] Fix MAXIMUM_PAGE_SIZE_BYTES comment + value
bccfb51 [Josh Rosen] Lower page size to 4MB in TestHive
ba54d4b [Josh Rosen] Make UnsafeExternalSorter's page size configurable
0045aa2 [Josh Rosen] Make UnsafeShuffle's page size configurable
bc734f0 [Josh Rosen] Rename configuration
e614858 [Josh Rosen] Makes BytesToBytesMap page size configurable
Diffstat (limited to 'sql')
5 files changed, 19 insertions, 4 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java index 684de6e81d..03f4c3ed8e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMap.java @@ -95,6 +95,7 @@ public final class UnsafeFixedWidthAggregationMap { * @param groupingKeySchema the schema of the grouping key, used for row conversion. * @param memoryManager the memory manager used to allocate our Unsafe memory structures. * @param initialCapacity the initial capacity of the map (a sizing hint to avoid re-hashing). + * @param pageSizeBytes the data page size, in bytes; limits the maximum record size. * @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact) */ public UnsafeFixedWidthAggregationMap( @@ -103,11 +104,13 @@ public final class UnsafeFixedWidthAggregationMap { StructType groupingKeySchema, TaskMemoryManager memoryManager, int initialCapacity, + long pageSizeBytes, boolean enablePerfMetrics) { this.aggregationBufferSchema = aggregationBufferSchema; this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema); this.groupingKeySchema = groupingKeySchema; - this.map = new BytesToBytesMap(memoryManager, initialCapacity, enablePerfMetrics); + this.map = + new BytesToBytesMap(memoryManager, initialCapacity, pageSizeBytes, enablePerfMetrics); this.enablePerfMetrics = enablePerfMetrics; // Initialize the buffer for aggregation value diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala index 48b7dc5745..6a907290f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala @@ -39,6 +39,7 @@ class UnsafeFixedWidthAggregationMapSuite private val groupKeySchema = StructType(StructField("product", StringType) :: Nil) private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil) private def emptyAggregationBuffer: InternalRow = InternalRow(0) + private val PAGE_SIZE_BYTES: Long = 1L << 26; // 64 megabytes private var memoryManager: TaskMemoryManager = null @@ -69,7 +70,8 @@ class UnsafeFixedWidthAggregationMapSuite aggBufferSchema, groupKeySchema, memoryManager, - 1024, // initial capacity + 1024, // initial capacity, + PAGE_SIZE_BYTES, false // disable perf metrics ) assert(!map.iterator().hasNext) @@ -83,6 +85,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, memoryManager, 1024, // initial capacity + PAGE_SIZE_BYTES, false // disable perf metrics ) val groupKey = InternalRow(UTF8String.fromString("cats")) @@ -109,6 +112,7 @@ class UnsafeFixedWidthAggregationMapSuite groupKeySchema, memoryManager, 128, // initial capacity + PAGE_SIZE_BYTES, false // disable perf metrics ) val rand = new Random(42) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 1cd1420480..b85aada9d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -260,12 +260,14 @@ case class GeneratedAggregate( } else if (unsafeEnabled && schemaSupportsUnsafe) { assert(iter.hasNext, "There should be at least one row for this path") log.info("Using Unsafe-based aggregator") + val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m") val aggregationMap = new UnsafeFixedWidthAggregationMap( newAggregationBuffer(EmptyRow), aggregationBufferSchema, groupKeySchema, TaskContext.get.taskMemoryManager(), 1024 * 16, // initial capacity + pageSizeBytes, false // disable tracking of performance metrics ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 9c058f1f72..7a50739131 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.nio.ByteOrder import java.util.{HashMap => JavaHashMap} +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkSqlSerializer @@ -259,7 +260,11 @@ private[joins] final class UnsafeHashedRelation( val nKeys = in.readInt() // This is used in Broadcast, shared by multiple tasks, so we use on-heap memory val memoryManager = new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP)) - binaryMap = new BytesToBytesMap(memoryManager, nKeys * 2) // reduce hash collision + val pageSizeBytes = SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m") + binaryMap = new BytesToBytesMap( + memoryManager, + nKeys * 2, // reduce hash collision + pageSizeBytes) var i = 0 var keyBuffer = new Array[Byte](1024) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 3662a4352f..7bbdef90cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -56,6 +56,7 @@ object TestHive .set("spark.sql.test", "") .set("spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") + .set("spark.buffer.pageSize", "4m") // SPARK-8910 .set("spark.ui.enabled", "false"))) |