diff options
author | Michael Armbrust <michael@databricks.com> | 2014-11-03 14:08:27 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-03 14:08:40 -0800 |
commit | 51985f78ca5f728f8b9233b703110f541d27b274 (patch) | |
tree | 462d026698bbc9475f176cfc2dd1c0bacb68a8d7 | |
parent | 6104754f711da9eb0c09daf377bcd750d2d23f8a (diff) | |
download | spark-51985f78ca5f728f8b9233b703110f541d27b274.tar.gz spark-51985f78ca5f728f8b9233b703110f541d27b274.tar.bz2 spark-51985f78ca5f728f8b9233b703110f541d27b274.zip |
[SQL] More aggressive defaults
- Turns on compression for in-memory cached data by default
- Changes the default parquet compression format back to gzip (we have seen more OOMs with production workloads due to the way Snappy allocates memory)
- Ups the batch size to 10,000 rows
- Increases the broadcast threshold to 10mb.
- Uses our parquet implementation instead of the hive one by default.
- Cache parquet metadata by default.
Author: Michael Armbrust <michael@databricks.com>
Closes #3064 from marmbrus/fasterDefaults and squashes the following commits:
97ee9f8 [Michael Armbrust] parquet codec docs
e641694 [Michael Armbrust] Remote also
a12866a [Michael Armbrust] Cache metadata.
2d73acc [Michael Armbrust] Update docs defaults.
d63d2d5 [Michael Armbrust] document parquet option
da373f9 [Michael Armbrust] More aggressive defaults
(cherry picked from commit 25bef7e6951301e93004567fc0cef96bf8d1a224)
Signed-off-by: Michael Armbrust <michael@databricks.com>
4 files changed, 22 insertions, 14 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d4ade939c3..e399fecbbc 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -582,19 +582,27 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or </tr> <tr> <td><code>spark.sql.parquet.cacheMetadata</code></td> - <td>false</td> + <td>true</td> <td> Turns on caching of Parquet schema metadata. Can speed up querying of static data. </td> </tr> <tr> <td><code>spark.sql.parquet.compression.codec</code></td> - <td>snappy</td> + <td>gzip</td> <td> Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. </td> </tr> +<tr> + <td><code>spark.sql.hive.convertMetastoreParquet</code></td> + <td>true</td> + <td> + When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in + support. + </td> +</tr> </table> ## JSON Datasets @@ -815,7 +823,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> <tr> <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td> - <td>false</td> + <td>true</td> <td> When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. @@ -823,7 +831,7 @@ Configuration of in-memory caching can be done using the `setConf` method on SQL </tr> <tr> <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td> - <td>1000</td> + <td>10000</td> <td> Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. @@ -841,7 +849,7 @@ that these options will be deprecated in future release as more optimizations ar <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> <tr> <td><code>spark.sql.autoBroadcastJoinThreshold</code></td> - <td>10000</td> + <td>10485760 (10 MB)</td> <td> Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 07e6e2eccd..279495aa64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -79,13 +79,13 @@ private[sql] trait SQLConf { private[spark] def dialect: String = getConf(DIALECT, "sql") /** When true tables cached using the in-memory columnar caching will be compressed. */ - private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean + private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "true").toBoolean /** The compression codec for writing to a Parquetfile */ - private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy") + private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip") /** The number of rows that will be */ - private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt + private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "10000").toInt /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt @@ -106,10 +106,10 @@ private[sql] trait SQLConf { * a broadcast value during the physical executions of join operations. Setting this to -1 * effectively disables auto conversion. * - * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000. + * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000. */ private[spark] def autoBroadcastJoinThreshold: Int = - getConf(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt + getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt /** * The default size in bytes to assign to a logical operator's estimation statistics. By default, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 9664c565a0..d00860a8bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -123,7 +123,7 @@ case class ParquetTableScan( // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata conf.set( SQLConf.PARQUET_CACHE_METADATA, - sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false")) + sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true")) val baseRDD = new org.apache.spark.rdd.NewHadoopRDD( @@ -394,7 +394,7 @@ private[parquet] class FilteringParquetRowInputFormat if (footers eq null) { val conf = ContextUtil.getConfiguration(jobContext) - val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false) + val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true) val statuses = listStatus(jobContext) fileStatuses = statuses.map(file => file.getPath -> file).toMap if (statuses.isEmpty) { @@ -493,7 +493,7 @@ private[parquet] class FilteringParquetRowInputFormat import parquet.filter2.compat.FilterCompat.Filter; import parquet.filter2.compat.RowGroupFilter; - val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false) + val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true) val splits = mutable.ArrayBuffer.empty[ParquetInputSplit] val filter: Filter = ParquetInputFormat.getFilter(configuration) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index f025169ad5..e88afaaf00 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -90,7 +90,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { * SerDe. */ private[spark] def convertMetastoreParquet: Boolean = - getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true" + getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true" override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution = new this.QueryExecution { val logical = plan } |