aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2014-11-03 14:08:27 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-03 14:08:40 -0800
commit51985f78ca5f728f8b9233b703110f541d27b274 (patch)
tree462d026698bbc9475f176cfc2dd1c0bacb68a8d7 /sql
parent6104754f711da9eb0c09daf377bcd750d2d23f8a (diff)
downloadspark-51985f78ca5f728f8b9233b703110f541d27b274.tar.gz
spark-51985f78ca5f728f8b9233b703110f541d27b274.tar.bz2
spark-51985f78ca5f728f8b9233b703110f541d27b274.zip
[SQL] More aggressive defaults
- Turns on compression for in-memory cached data by default - Changes the default parquet compression format back to gzip (we have seen more OOMs with production workloads due to the way Snappy allocates memory) - Ups the batch size to 10,000 rows - Increases the broadcast threshold to 10mb. - Uses our parquet implementation instead of the hive one by default. - Cache parquet metadata by default. Author: Michael Armbrust <michael@databricks.com> Closes #3064 from marmbrus/fasterDefaults and squashes the following commits: 97ee9f8 [Michael Armbrust] parquet codec docs e641694 [Michael Armbrust] Remote also a12866a [Michael Armbrust] Cache metadata. 2d73acc [Michael Armbrust] Update docs defaults. d63d2d5 [Michael Armbrust] document parquet option da373f9 [Michael Armbrust] More aggressive defaults (cherry picked from commit 25bef7e6951301e93004567fc0cef96bf8d1a224) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
3 files changed, 9 insertions, 9 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 07e6e2eccd..279495aa64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -79,13 +79,13 @@ private[sql] trait SQLConf {
private[spark] def dialect: String = getConf(DIALECT, "sql")
/** When true tables cached using the in-memory columnar caching will be compressed. */
- private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
+ private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "true").toBoolean
/** The compression codec for writing to a Parquetfile */
- private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")
+ private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip")
/** The number of rows that will be */
- private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
+ private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "10000").toInt
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
@@ -106,10 +106,10 @@ private[sql] trait SQLConf {
* a broadcast value during the physical executions of join operations. Setting this to -1
* effectively disables auto conversion.
*
- * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
+ * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000.
*/
private[spark] def autoBroadcastJoinThreshold: Int =
- getConf(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
+ getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt
/**
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 9664c565a0..d00860a8bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -123,7 +123,7 @@ case class ParquetTableScan(
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
conf.set(
SQLConf.PARQUET_CACHE_METADATA,
- sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "false"))
+ sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
val baseRDD =
new org.apache.spark.rdd.NewHadoopRDD(
@@ -394,7 +394,7 @@ private[parquet] class FilteringParquetRowInputFormat
if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
- val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+ val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
if (statuses.isEmpty) {
@@ -493,7 +493,7 @@ private[parquet] class FilteringParquetRowInputFormat
import parquet.filter2.compat.FilterCompat.Filter;
import parquet.filter2.compat.RowGroupFilter;
- val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, false)
+ val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
val filter: Filter = ParquetInputFormat.getFilter(configuration)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index f025169ad5..e88afaaf00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -90,7 +90,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
* SerDe.
*/
private[spark] def convertMetastoreParquet: Boolean =
- getConf("spark.sql.hive.convertMetastoreParquet", "false") == "true"
+ getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }