aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-01-22 23:53:12 -0800
committerReynold Xin <rxin@databricks.com>2016-01-22 23:53:12 -0800
commit5af5a02160b42115579003b749c4d1831bf9d48e (patch)
tree5a74acd4542a69b6287b254a63b3f0433f69bb51 /sql/core
parentea5c38fe75e4dfa60e61c5b4f20b742b67cb49b2 (diff)
downloadspark-5af5a02160b42115579003b749c4d1831bf9d48e.tar.gz
spark-5af5a02160b42115579003b749c4d1831bf9d48e.tar.bz2
spark-5af5a02160b42115579003b749c4d1831bf9d48e.zip
[SPARK-12872][SQL] Support to specify the option for compression codec for JSON datasource
https://issues.apache.org/jira/browse/SPARK-12872 This PR makes the JSON datasource can compress output by option instead of manually setting Hadoop configurations. For reflecting codec by names, it is similar with https://github.com/apache/spark/pull/10805. As `CSVCompressionCodecs` can be shared with other datasources, it became a separate class to share as `CompressionCodecs`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #10858 from HyukjinKwon/SPARK-12872.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala47
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala28
5 files changed, 96 insertions, 29 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
new file mode 100644
index 0000000000..e683a95ed2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.io.compress.{BZip2Codec, GzipCodec, Lz4Codec, SnappyCodec}
+
+import org.apache.spark.util.Utils
+
+private[datasources] object CompressionCodecs {
+ private val shortCompressionCodecNames = Map(
+ "bzip2" -> classOf[BZip2Codec].getName,
+ "gzip" -> classOf[GzipCodec].getName,
+ "lz4" -> classOf[Lz4Codec].getName,
+ "snappy" -> classOf[SnappyCodec].getName)
+
+ /**
+ * Return the full version of the given codec class.
+ * If it is already a class name, just return it.
+ */
+ def getCodecClassName(name: String): String = {
+ val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
+ try {
+ // Validate the codec name
+ Utils.classForName(codecName)
+ codecName
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
index 676a3d3bca..0278675aa6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParameters.scala
@@ -22,6 +22,7 @@ import java.nio.charset.Charset
import org.apache.hadoop.io.compress._
import org.apache.spark.Logging
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
import org.apache.spark.util.Utils
private[sql] case class CSVParameters(@transient parameters: Map[String, String]) extends Logging {
@@ -78,7 +79,7 @@ private[sql] case class CSVParameters(@transient parameters: Map[String, String]
val compressionCodec: Option[String] = {
val name = parameters.get("compression").orElse(parameters.get("codec"))
- name.map(CSVCompressionCodecs.getCodecClassName)
+ name.map(CompressionCodecs.getCodecClassName)
}
val maxColumns = 20480
@@ -114,28 +115,3 @@ private[csv] object ParseModes {
true // We default to permissive is the mode string is not valid
}
}
-
-private[csv] object CSVCompressionCodecs {
- private val shortCompressionCodecNames = Map(
- "bzip2" -> classOf[BZip2Codec].getName,
- "gzip" -> classOf[GzipCodec].getName,
- "lz4" -> classOf[Lz4Codec].getName,
- "snappy" -> classOf[SnappyCodec].getName)
-
- /**
- * Return the full version of the given codec class.
- * If it is already a class name, just return it.
- */
- def getCodecClassName(name: String): String = {
- val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
- try {
- // Validate the codec name
- Utils.classForName(codecName)
- codecName
- } catch {
- case e: ClassNotFoundException =>
- throw new IllegalArgumentException(s"Codec [$codecName] " +
- s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index aee9cf2bdb..e74a76c532 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.spark.sql.execution.datasources.CompressionCodecs
+
/**
* Options for the JSON data source.
*
@@ -32,7 +34,8 @@ case class JSONOptions(
allowSingleQuotes: Boolean = true,
allowNumericLeadingZeros: Boolean = false,
allowNonNumericNumbers: Boolean = false,
- allowBackslashEscapingAnyCharacter: Boolean = false) {
+ allowBackslashEscapingAnyCharacter: Boolean = false,
+ compressionCodec: Option[String] = None) {
/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
@@ -46,7 +49,6 @@ case class JSONOptions(
}
}
-
object JSONOptions {
def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
samplingRatio =
@@ -64,6 +66,10 @@ object JSONOptions {
allowNonNumericNumbers =
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true),
allowBackslashEscapingAnyCharacter =
- parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
+ parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false),
+ compressionCodec = {
+ val name = parameters.get("compression").orElse(parameters.get("codec"))
+ name.map(CompressionCodecs.getCodecClassName)
+ }
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 31c5620c9a..93727abcc7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonFactory
import com.google.common.base.Objects
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
+import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -162,6 +163,15 @@ private[sql] class JSONRelation(
}
override def prepareJobForWrite(job: Job): BucketedOutputWriterFactory = {
+ val conf = job.getConfiguration
+ options.compressionCodec.foreach { codec =>
+ conf.set("mapreduce.output.fileoutputformat.compress", "true")
+ conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
+ conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+ conf.set("mapreduce.map.output.compress", "true")
+ conf.set("mapreduce.map.output.compress.codec", codec)
+ }
+
new BucketedOutputWriterFactory {
override def newInstance(
path: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index a3c6a1d7b2..d22fa7905a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1467,6 +1467,34 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("SPARK-12872 Support to specify the option for compression codec") {
+ withTempDir { dir =>
+ val dir = Utils.createTempDir()
+ dir.delete()
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
+
+ val jsonDF = sqlContext.read.json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write
+ .format("json")
+ .option("compression", "gZiP")
+ .save(jsonDir)
+
+ val compressedFiles = new File(jsonDir).listFiles()
+ assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+
+ val jsonCopy = sqlContext.read
+ .format("json")
+ .load(jsonDir)
+
+ assert(jsonCopy.count == jsonDF.count)
+ val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+ val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+ checkAnswer(jsonCopySome, jsonDFSome)
+ }
+ }
+
test("Casting long as timestamp") {
withTempTable("jsonTable") {
val schema = (new StructType).add("ts", TimestampType)