aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-05-13 09:04:37 -0700
committerReynold Xin <rxin@databricks.com>2016-05-13 09:04:37 -0700
commit3ded5bc4db2badc9ff49554e73421021d854306b (patch)
treec8e8bbb95806cddb899c971c333a6088e443fbe9 /sql/hive
parent10a838967455db80d750ef84a1c6b3088b19fd9f (diff)
downloadspark-3ded5bc4db2badc9ff49554e73421021d854306b.tar.gz
spark-3ded5bc4db2badc9ff49554e73421021d854306b.tar.bz2
spark-3ded5bc4db2badc9ff49554e73421021d854306b.zip
[SPARK-15267][SQL] Refactor options for JDBC and ORC data sources and change default compression for ORC
## What changes were proposed in this pull request? Currently, Parquet, JSON and CSV data sources have a class for thier options, (`ParquetOptions`, `JSONOptions` and `CSVOptions`). It is convenient to manage options for sources to gather options into a class. Currently, `JDBC`, `Text`, `libsvm` and `ORC` datasources do not have this class. This might be nicer if these options are in a unified format so that options can be added and This PR refactors the options in Spark internal data sources adding new classes, `OrcOptions`, `TextOptions`, `JDBCOptions` and `LibSVMOptions`. Also, this PR change the default compression codec for ORC from `NONE` to `SNAPPY`. ## How was this patch tested? Existing tests should cover this for refactoring and unittests in `OrcHadoopFsRelationSuite` for changing the default compression codec for ORC. Author: hyukjinkwon <gurwls223@gmail.com> Closes #13048 from HyukjinKwon/SPARK-15267.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala52
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala46
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala8
4 files changed, 81 insertions, 43 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
new file mode 100644
index 0000000000..91cf0dc960
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+/**
+ * Options for the ORC data source.
+ */
+private[orc] class OrcOptions(
+ @transient private val parameters: Map[String, String])
+ extends Serializable {
+
+ import OrcOptions._
+
+ /**
+ * Compression codec to use. By default snappy compression.
+ * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
+ */
+ val compressionCodec: String = {
+ val codecName = parameters.getOrElse("compression", "snappy").toLowerCase
+ if (!shortOrcCompressionCodecNames.contains(codecName)) {
+ val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
+ }
+ shortOrcCompressionCodecNames(codecName)
+ }
+}
+
+private[orc] object OrcOptions {
+ // The ORC compression short names
+ private val shortOrcCompressionCodecNames = Map(
+ "none" -> "NONE",
+ "uncompressed" -> "NONE",
+ "snappy" -> "SNAPPY",
+ "zlib" -> "ZLIB",
+ "lzo" -> "LZO")
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index fed3150304..6e55137dd7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc._
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
@@ -37,7 +36,6 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
@@ -66,28 +64,12 @@ private[sql] class DefaultSource
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val compressionCodec: Option[String] = options
- .get("compression")
- .map { codecName =>
- // Validate if given compression codec is supported or not.
- val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames
- if (!shortOrcCompressionCodecNames.contains(codecName.toLowerCase)) {
- val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
- throw new IllegalArgumentException(s"Codec [$codecName] " +
- s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
- }
- codecName.toLowerCase
- }
+ val orcOptions = new OrcOptions(options)
- compressionCodec.foreach { codecName =>
- job.getConfiguration.set(
- OrcTableProperties.COMPRESSION.getPropName,
- OrcRelation
- .shortOrcCompressionCodecNames
- .getOrElse(codecName, CompressionKind.NONE).name())
- }
+ val configuration = job.getConfiguration
- job.getConfiguration match {
+ configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec)
+ configuration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -205,7 +187,7 @@ private[orc] class OrcOutputWriter(
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
val compressionExtension = {
- val name = conf.get(OrcTableProperties.COMPRESSION.getPropName)
+ val name = conf.get(OrcRelation.ORC_COMPRESSION)
OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
}
// It has the `.orc` extension at the end because (de)compression tools
@@ -329,21 +311,15 @@ private[orc] object OrcTableScan {
}
private[orc] object OrcRelation extends HiveInspectors {
- // The ORC compression short names
- val shortOrcCompressionCodecNames = Map(
- "none" -> CompressionKind.NONE,
- "uncompressed" -> CompressionKind.NONE,
- "snappy" -> CompressionKind.SNAPPY,
- "zlib" -> CompressionKind.ZLIB,
- "lzo" -> CompressionKind.LZO)
+ // The references of Hive's classes will be minimized.
+ val ORC_COMPRESSION = "orc.compress"
// The extensions for ORC compression codecs
val extensionsForCompressionCodecNames = Map(
- CompressionKind.NONE.name -> "",
- CompressionKind.SNAPPY.name -> ".snappy",
- CompressionKind.ZLIB.name -> ".zlib",
- CompressionKind.LZO.name -> ".lzo"
- )
+ "NONE" -> "",
+ "SNAPPY" -> ".snappy",
+ "ZLIB" -> ".zlib",
+ "LZO" -> ".lzo")
def unwrapOrcStructs(
conf: Configuration,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 965680ff0d..0207b4e8c9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.orc
import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
@@ -98,9 +97,10 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined)
- val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
- val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))
- assert(orcReader.getCompression == CompressionKind.ZLIB)
+ val orcFilePath = maybeOrcFile.get.toPath.toString
+ val expectedCompressionKind =
+ OrcFileOperator.getFileReader(orcFilePath).get.getCompression
+ assert("ZLIB" === expectedCompressionKind.name())
val copyDf = spark
.read
@@ -108,4 +108,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(df, copyDf)
}
}
+
+ test("Default compression codec is snappy for ORC compression") {
+ withTempPath { file =>
+ spark.range(0, 10).write
+ .orc(file.getCanonicalPath)
+ val expectedCompressionKind =
+ OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
+ assert("SNAPPY" === expectedCompressionKind.name())
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 084546f99d..9a0885822b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -171,7 +171,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "ZLIB")
+ .option("compression", "ZLIB")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -180,7 +180,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "SNAPPY")
+ .option("compression", "SNAPPY")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -189,7 +189,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "NONE")
+ .option("compression", "NONE")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -201,7 +201,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "LZO")
+ .option("compression", "LZO")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression