aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-08 23:52:04 -0700
committerReynold Xin <rxin@databricks.com>2016-04-08 23:52:04 -0700
commit2f0b882e5c8787b09bedcc8208e6dcc5662dbbab (patch)
treeaa17c6aa99fdbe772e51cdb40095a2cff492f754 /sql/core
parentd7af736b2cf6c392b87e7b45c2d2219ef06979eb (diff)
downloadspark-2f0b882e5c8787b09bedcc8208e6dcc5662dbbab.tar.gz
spark-2f0b882e5c8787b09bedcc8208e6dcc5662dbbab.tar.bz2
spark-2f0b882e5c8787b09bedcc8208e6dcc5662dbbab.zip
[SPARK-14482][SQL] Change default Parquet codec from gzip to snappy
## What changes were proposed in this pull request? Based on our tests, gzip decompression is very slow (< 100MB/s), making queries decompression bound. Snappy can decompress at ~ 500MB/s on a single core. This patch changes the default compression codec for Parquet output from gzip to snappy, and also introduces a ParquetOptions class to be more consistent with other data sources (e.g. CSV, JSON). ## How was this patch tested? Should be covered by existing unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12256 from rxin/SPARK-14482.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala59
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala2
4 files changed, 65 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 95de02cf5c..7b9d3b605a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,8 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
-private[sql] class CSVOptions(
- @transient private val parameters: Map[String, String])
+private[sql] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {
private def getChar(paramName: String, default: Char): Char = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
new file mode 100644
index 0000000000..00352f23ae
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Options for the Parquet data source.
+ */
+class ParquetOptions(
+ @transient private val parameters: Map[String, String],
+ @transient private val sqlConf: SQLConf)
+ extends Logging with Serializable {
+
+ import ParquetOptions._
+
+ /**
+ * Compression codec to use. By default use the value specified in SQLConf.
+ * Acceptable values are defined in [[shortParquetCompressionCodecNames]].
+ */
+ val compressionCodec: String = {
+ val codecName = parameters.getOrElse("compression", sqlConf.parquetCompressionCodec).toLowerCase
+ if (!shortParquetCompressionCodecNames.contains(codecName)) {
+ val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
+ }
+ shortParquetCompressionCodecNames(codecName).name()
+ }
+}
+
+
+object ParquetOptions {
+ // The parquet compression short names
+ private val shortParquetCompressionCodecNames = Map(
+ "none" -> CompressionCodecName.UNCOMPRESSED,
+ "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
+ "snappy" -> CompressionCodecName.SNAPPY,
+ "gzip" -> CompressionCodecName.GZIP,
+ "lzo" -> CompressionCodecName.LZO)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5ad95e4b9e..ca6803b737 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -74,6 +74,8 @@ private[sql] class DefaultSource
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
+ val parquetOptions = new ParquetOptions(options, sqlContext.sessionState.conf)
+
val conf = ContextUtil.getConfiguration(job)
val committerClass =
@@ -84,24 +86,11 @@ private[sql] class DefaultSource
if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
logInfo("Using default output committer for Parquet: " +
- classOf[ParquetOutputCommitter].getCanonicalName)
+ classOf[ParquetOutputCommitter].getCanonicalName)
} else {
logInfo("Using user defined output committer for Parquet: " + committerClass.getCanonicalName)
}
- val compressionCodec: Option[String] = options
- .get("compression")
- .map { codecName =>
- // Validate if given compression codec is supported or not.
- val shortParquetCompressionCodecNames = ParquetRelation.shortParquetCompressionCodecNames
- if (!shortParquetCompressionCodecNames.contains(codecName.toLowerCase)) {
- val availableCodecs = shortParquetCompressionCodecNames.keys.map(_.toLowerCase)
- throw new IllegalArgumentException(s"Codec [$codecName] " +
- s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
- }
- codecName.toLowerCase
- }
-
conf.setClass(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
committerClass,
@@ -136,14 +125,7 @@ private[sql] class DefaultSource
sqlContext.conf.writeLegacyParquetFormat.toString)
// Sets compression scheme
- conf.set(
- ParquetOutputFormat.COMPRESSION,
- ParquetRelation
- .shortParquetCompressionCodecNames
- .getOrElse(
- compressionCodec
- .getOrElse(sqlContext.conf.parquetCompressionCodec.toLowerCase),
- CompressionCodecName.UNCOMPRESSED).name())
+ conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
new OutputWriterFactory {
override def newInstance(
@@ -917,12 +899,4 @@ private[sql] object ParquetRelation extends Logging {
// should be removed after this issue is fixed.
}
}
-
- // The parquet compression short names
- val shortParquetCompressionCodecNames = Map(
- "none" -> CompressionCodecName.UNCOMPRESSED,
- "uncompressed" -> CompressionCodecName.UNCOMPRESSED,
- "snappy" -> CompressionCodecName.SNAPPY,
- "gzip" -> CompressionCodecName.GZIP,
- "lzo" -> CompressionCodecName.LZO)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index dc6ba1bcfb..b58f960897 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -193,7 +193,7 @@ object SQLConf {
.stringConf
.transform(_.toLowerCase())
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
- .createWithDefault("gzip")
+ .createWithDefault("snappy")
val PARQUET_FILTER_PUSHDOWN_ENABLED = SQLConfigBuilder("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")