aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala94
3 files changed, 107 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 5cc41a83cc..f0df19112a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -33,6 +33,7 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
+ val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
// This is only used for the thriftserver
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
@@ -78,6 +79,9 @@ trait SQLConf {
/** When true tables cached using the in-memory columnar caching will be compressed. */
private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "false").toBoolean
+ /** The compression codec for writing to a Parquetfile */
+ private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "snappy")
+
/** The number of rows that will be */
private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "1000").toInt
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 1713ae6fb5..5ae768293a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -100,8 +100,13 @@ private[sql] object ParquetRelation {
// The compression type
type CompressionType = parquet.hadoop.metadata.CompressionCodecName
- // The default compression
- val defaultCompression = CompressionCodecName.GZIP
+ // The parquet compression short names
+ val shortParquetCompressionCodecNames = Map(
+ "NONE" -> CompressionCodecName.UNCOMPRESSED,
+ "UNCOMPRESSED" -> CompressionCodecName.UNCOMPRESSED,
+ "SNAPPY" -> CompressionCodecName.SNAPPY,
+ "GZIP" -> CompressionCodecName.GZIP,
+ "LZO" -> CompressionCodecName.LZO)
/**
* Creates a new ParquetRelation and underlying Parquetfile for the given LogicalPlan. Note that
@@ -141,9 +146,8 @@ private[sql] object ParquetRelation {
conf: Configuration,
sqlContext: SQLContext): ParquetRelation = {
val path = checkPath(pathString, allowExisting, conf)
- if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
- conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
- }
+ conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
+ sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
ParquetRelation.enableLogForwarding()
ParquetTypesConverter.writeMetaData(attributes, path, conf)
new ParquetRelation(path.toString, Some(conf), sqlContext) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 172dcd6aa0..28f43b3683 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -186,6 +186,100 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString)
}
+ test("Compression options for writing to a Parquetfile") {
+ val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
+ import scala.collection.JavaConversions._
+
+ val file = getTempFilePath("parquet")
+ val path = file.toString
+ val rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
+ .map(i => TestRDDEntry(i, s"val_$i"))
+
+ // test default compression codec
+ rdd.saveAsParquetFile(path)
+ var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+ assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+ parquetFile(path).registerTempTable("tmp")
+ checkAnswer(
+ sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+ (5, "val_5") ::
+ (7, "val_7") :: Nil)
+
+ Utils.deleteRecursively(file)
+
+ // test uncompressed parquet file with property value "UNCOMPRESSED"
+ TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "UNCOMPRESSED")
+
+ rdd.saveAsParquetFile(path)
+ actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+ assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+ parquetFile(path).registerTempTable("tmp")
+ checkAnswer(
+ sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+ (5, "val_5") ::
+ (7, "val_7") :: Nil)
+
+ Utils.deleteRecursively(file)
+
+ // test uncompressed parquet file with property value "none"
+ TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "none")
+
+ rdd.saveAsParquetFile(path)
+ actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+ assert(actualCodec === "UNCOMPRESSED" :: Nil)
+
+ parquetFile(path).registerTempTable("tmp")
+ checkAnswer(
+ sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+ (5, "val_5") ::
+ (7, "val_7") :: Nil)
+
+ Utils.deleteRecursively(file)
+
+ // test gzip compression codec
+ TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "gzip")
+
+ rdd.saveAsParquetFile(path)
+ actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+ assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+ parquetFile(path).registerTempTable("tmp")
+ checkAnswer(
+ sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+ (5, "val_5") ::
+ (7, "val_7") :: Nil)
+
+ Utils.deleteRecursively(file)
+
+ // test snappy compression codec
+ TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, "snappy")
+
+ rdd.saveAsParquetFile(path)
+ actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
+ .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
+ assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+
+ parquetFile(path).registerTempTable("tmp")
+ checkAnswer(
+ sql("SELECT key, value FROM tmp WHERE value = 'val_5' OR value = 'val_7'"),
+ (5, "val_5") ::
+ (7, "val_7") :: Nil)
+
+ Utils.deleteRecursively(file)
+
+ // TODO: Lzo requires additional external setup steps so leave it out for now
+ // ref.: https://github.com/Parquet/parquet-mr/blob/parquet-1.5.0/parquet-hadoop/src/test/java/parquet/hadoop/example/TestInputOutputFormat.java#L169
+
+ // Set it back.
+ TestSQLContext.setConf(SQLConf.PARQUET_COMPRESSION, defaultParquetCompressionCodec)
+ }
+
test("Read/Write All Types with non-primitive type") {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
val range = (0 to 255)