aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-01-19 20:45:52 -0800
committerReynold Xin <rxin@databricks.com>2016-01-19 20:45:52 -0800
commit6844d36aea91e9a7114f477a1cf3cdb9a882926a (patch)
tree395a6cc27d2d8222244960324c6d9b9b6b32e00e
parent488bbb216c82306e82b8963a331d48d484e8eadd (diff)
downloadspark-6844d36aea91e9a7114f477a1cf3cdb9a882926a.tar.gz
spark-6844d36aea91e9a7114f477a1cf3cdb9a882926a.tar.bz2
spark-6844d36aea91e9a7114f477a1cf3cdb9a882926a.zip
[SPARK-12871][SQL] Support to specify the option for compression codec.
https://issues.apache.org/jira/browse/SPARK-12871 This PR added an option to support to specify compression codec. This adds the option `codec` as an alias `compression` as filed in [SPARK-12668 ](https://issues.apache.org/jira/browse/SPARK-12668). Note that I did not add configurations for Hadoop 1.x as this `CsvRelation` is using Hadoop 2.x API and I guess it is going to drop Hadoop 1.x support. Author: hyukjinkwon <gurwls223@gmail.com> Closes #10805 from HyukjinKwon/SPARK-12420.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala26
3 files changed, 70 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
index 127c9728da..676a3d3bca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
@@ -19,7 +19,10 @@ package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.Charset
+import org.apache.hadoop.io.compress._
+
import org.apache.spark.Logging
+import org.apache.spark.util.Utils
private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging {
@@ -35,7 +38,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
private def getBool(paramName: String, default: Boolean = false): Boolean = {
val param = parameters.getOrElse(paramName, default.toString)
- if (param.toLowerCase() == "true") {
+ if (param.toLowerCase == "true") {
true
} else if (param.toLowerCase == "false") {
false
@@ -73,6 +76,11 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
val nullValue = parameters.getOrElse("nullValue", "")
+ val compressionCodec: Option[String] = {
+ val name = parameters.get("compression").orElse(parameters.get("codec"))
+ name.map(CSVCompressionCodecs.getCodecClassName)
+ }
+
val maxColumns = 20480
val maxCharsPerColumn = 100000
@@ -85,7 +93,6 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
}
private[csv] object ParseModes {
-
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"
@@ -107,3 +114,28 @@ private[csv] object ParseModes {
true // We default to permissive is the mode string is not valid
}
}
+
+private[csv] object CSVCompressionCodecs {
+ private val shortCompressionCodecNames = Map(
+ "bzip2" -> classOf[BZip2Codec].getName,
+ "gzip" -> classOf[GzipCodec].getName,
+ "lz4" -> classOf[Lz4Codec].getName,
+ "snappy" -> classOf[SnappyCodec].getName)
+
+ /**
+ * Return the full version of the given codec class.
+ * If it is already a class name, just return it.
+ */
+ def getCodecClassName(name: String): String = {
+ val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
+ try {
+ // Validate the codec name
+ Utils.classForName(codecName)
+ codecName
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 53818853ff..1502501c3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
+import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.hadoop.mapreduce.RecordWriter
@@ -99,6 +100,15 @@ private[csv] class CSVRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ params.compressionCodec.foreach { codec =>
+ conf.set("mapreduce.output.fileoutputformat.compress", "true")
+ conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
+ conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+ conf.set("mapreduce.map.output.compress", "true")
+ conf.set("mapreduce.map.output.compress.codec", codec)
+ }
+
new CSVOutputWriterFactory(params)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 071b5ef56d..a79566b1f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -349,4 +349,30 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(results(0).toSeq === Array(2012, "Tesla", "S", "null", "null"))
assert(results(2).toSeq === Array(null, "Chevy", "Volt", null, null))
}
+
+ test("save csv with compression codec option") {
+ withTempDir { dir =>
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ val cars = sqlContext.read
+ .format("csv")
+ .option("header", "true")
+ .load(testFile(carsFile))
+
+ cars.coalesce(1).write
+ .format("csv")
+ .option("header", "true")
+ .option("compression", "gZiP")
+ .save(csvDir)
+
+ val compressedFiles = new File(csvDir).listFiles()
+ assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+
+ val carsCopy = sqlContext.read
+ .format("csv")
+ .option("header", "true")
+ .load(csvDir)
+
+ verifyCars(carsCopy, withHeader = true)
+ }
+ }
}