aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-05-13 09:04:37 -0700
committerReynold Xin <rxin@databricks.com>2016-05-13 09:04:37 -0700
commit3ded5bc4db2badc9ff49554e73421021d854306b (patch)
treec8e8bbb95806cddb899c971c333a6088e443fbe9
parent10a838967455db80d750ef84a1c6b3088b19fd9f (diff)
downloadspark-3ded5bc4db2badc9ff49554e73421021d854306b.tar.gz
spark-3ded5bc4db2badc9ff49554e73421021d854306b.tar.bz2
spark-3ded5bc4db2badc9ff49554e73421021d854306b.zip
[SPARK-15267][SQL] Refactor options for JDBC and ORC data sources and change default compression for ORC
## What changes were proposed in this pull request? Currently, Parquet, JSON and CSV data sources have a class for thier options, (`ParquetOptions`, `JSONOptions` and `CSVOptions`). It is convenient to manage options for sources to gather options into a class. Currently, `JDBC`, `Text`, `libsvm` and `ORC` datasources do not have this class. This might be nicer if these options are in a unified format so that options can be added and This PR refactors the options in Spark internal data sources adding new classes, `OrcOptions`, `TextOptions`, `JDBCOptions` and `LibSVMOptions`. Also, this PR change the default compression codec for ORC from `NONE` to `SNAPPY`. ## How was this patch tested? Existing tests should cover this for refactoring and unittests in `OrcHadoopFsRelationSuite` for changing the default compression codec for ORC. Author: hyukjinkwon <gurwls223@gmail.com> Closes #13048 from HyukjinKwon/SPARK-15267.
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala52
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala46
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala8
8 files changed, 135 insertions, 63 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 68a855c99f..39bdd1afad 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -32,7 +32,7 @@ import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, JoinedRow}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
index 6ff50a3c61..6609e5dee3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
@@ -30,30 +30,26 @@ class DefaultSource extends RelationProvider with DataSourceRegister {
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
- val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
- val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
- val partitionColumn = parameters.getOrElse("partitionColumn", null)
- val lowerBound = parameters.getOrElse("lowerBound", null)
- val upperBound = parameters.getOrElse("upperBound", null)
- val numPartitions = parameters.getOrElse("numPartitions", null)
-
- if (partitionColumn != null
- && (lowerBound == null || upperBound == null || numPartitions == null)) {
+ val jdbcOptions = new JDBCOptions(parameters)
+ if (jdbcOptions.partitionColumn != null
+ && (jdbcOptions.lowerBound == null
+ || jdbcOptions.upperBound == null
+ || jdbcOptions.numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
- val partitionInfo = if (partitionColumn == null) {
+ val partitionInfo = if (jdbcOptions.partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
- partitionColumn,
- lowerBound.toLong,
- upperBound.toLong,
- numPartitions.toInt)
+ jdbcOptions.partitionColumn,
+ jdbcOptions.lowerBound.toLong,
+ jdbcOptions.upperBound.toLong,
+ jdbcOptions.numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
- JDBCRelation(url, table, parts, properties)(sqlContext.sparkSession)
+ JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
new file mode 100644
index 0000000000..6c6ec89746
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+/**
+ * Options for the JDBC data source.
+ */
+private[jdbc] class JDBCOptions(
+ @transient private val parameters: Map[String, String])
+ extends Serializable {
+
+ // a JDBC URL
+ val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
+ // name of table
+ val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
+ // the column used to partition
+ val partitionColumn = parameters.getOrElse("partitionColumn", null)
+ // the lower bound of partition column
+ val lowerBound = parameters.getOrElse("lowerBound", null)
+ // the upper bound of the partition column
+ val upperBound = parameters.getOrElse("upperBound", null)
+ // the number of partitions
+ val numPartitions = parameters.getOrElse("numPartitions", null)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index 00352f23ae..1ff217cbf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,16 +19,15 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
-class ParquetOptions(
+private[parquet] class ParquetOptions(
@transient private val parameters: Map[String, String],
@transient private val sqlConf: SQLConf)
- extends Logging with Serializable {
+ extends Serializable {
import ParquetOptions._
@@ -48,7 +47,7 @@ class ParquetOptions(
}
-object ParquetOptions {
+private[parquet] object ParquetOptions {
// The parquet compression short names
private val shortParquetCompressionCodecNames = Map(
"none" -> CompressionCodecName.UNCOMPRESSED,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
new file mode 100644
index 0000000000..91cf0dc960
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+/**
+ * Options for the ORC data source.
+ */
+private[orc] class OrcOptions(
+ @transient private val parameters: Map[String, String])
+ extends Serializable {
+
+ import OrcOptions._
+
+ /**
+ * Compression codec to use. By default snappy compression.
+ * Acceptable values are defined in [[shortOrcCompressionCodecNames]].
+ */
+ val compressionCodec: String = {
+ val codecName = parameters.getOrElse("compression", "snappy").toLowerCase
+ if (!shortOrcCompressionCodecNames.contains(codecName)) {
+ val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
+ }
+ shortOrcCompressionCodecNames(codecName)
+ }
+}
+
+private[orc] object OrcOptions {
+ // The ORC compression short names
+ private val shortOrcCompressionCodecNames = Map(
+ "none" -> "NONE",
+ "uncompressed" -> "NONE",
+ "snappy" -> "SNAPPY",
+ "zlib" -> "ZLIB",
+ "lzo" -> "LZO")
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index fed3150304..6e55137dd7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc._
-import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
@@ -37,7 +36,6 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
@@ -66,28 +64,12 @@ private[sql] class DefaultSource
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
- val compressionCodec: Option[String] = options
- .get("compression")
- .map { codecName =>
- // Validate if given compression codec is supported or not.
- val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames
- if (!shortOrcCompressionCodecNames.contains(codecName.toLowerCase)) {
- val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase)
- throw new IllegalArgumentException(s"Codec [$codecName] " +
- s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.")
- }
- codecName.toLowerCase
- }
+ val orcOptions = new OrcOptions(options)
- compressionCodec.foreach { codecName =>
- job.getConfiguration.set(
- OrcTableProperties.COMPRESSION.getPropName,
- OrcRelation
- .shortOrcCompressionCodecNames
- .getOrElse(codecName, CompressionKind.NONE).name())
- }
+ val configuration = job.getConfiguration
- job.getConfiguration match {
+ configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec)
+ configuration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -205,7 +187,7 @@ private[orc] class OrcOutputWriter(
val partition = taskAttemptId.getTaskID.getId
val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
val compressionExtension = {
- val name = conf.get(OrcTableProperties.COMPRESSION.getPropName)
+ val name = conf.get(OrcRelation.ORC_COMPRESSION)
OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "")
}
// It has the `.orc` extension at the end because (de)compression tools
@@ -329,21 +311,15 @@ private[orc] object OrcTableScan {
}
private[orc] object OrcRelation extends HiveInspectors {
- // The ORC compression short names
- val shortOrcCompressionCodecNames = Map(
- "none" -> CompressionKind.NONE,
- "uncompressed" -> CompressionKind.NONE,
- "snappy" -> CompressionKind.SNAPPY,
- "zlib" -> CompressionKind.ZLIB,
- "lzo" -> CompressionKind.LZO)
+ // The references of Hive's classes will be minimized.
+ val ORC_COMPRESSION = "orc.compress"
// The extensions for ORC compression codecs
val extensionsForCompressionCodecNames = Map(
- CompressionKind.NONE.name -> "",
- CompressionKind.SNAPPY.name -> ".snappy",
- CompressionKind.ZLIB.name -> ".zlib",
- CompressionKind.LZO.name -> ".lzo"
- )
+ "NONE" -> "",
+ "SNAPPY" -> ".snappy",
+ "ZLIB" -> ".zlib",
+ "LZO" -> ".lzo")
def unwrapOrcStructs(
conf: Configuration,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 965680ff0d..0207b4e8c9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.orc
import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.Row
@@ -98,9 +97,10 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
val fs = FileSystem.getLocal(conf)
val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
assert(maybeOrcFile.isDefined)
- val orcFilePath = new Path(maybeOrcFile.get.toPath.toString)
- val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf))
- assert(orcReader.getCompression == CompressionKind.ZLIB)
+ val orcFilePath = maybeOrcFile.get.toPath.toString
+ val expectedCompressionKind =
+ OrcFileOperator.getFileReader(orcFilePath).get.getCompression
+ assert("ZLIB" === expectedCompressionKind.name())
val copyDf = spark
.read
@@ -108,4 +108,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
checkAnswer(df, copyDf)
}
}
+
+ test("Default compression codec is snappy for ORC compression") {
+ withTempPath { file =>
+ spark.range(0, 10).write
+ .orc(file.getCanonicalPath)
+ val expectedCompressionKind =
+ OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
+ assert("SNAPPY" === expectedCompressionKind.name())
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 084546f99d..9a0885822b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -171,7 +171,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "ZLIB")
+ .option("compression", "ZLIB")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -180,7 +180,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "SNAPPY")
+ .option("compression", "SNAPPY")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -189,7 +189,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "NONE")
+ .option("compression", "NONE")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
@@ -201,7 +201,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") {
withTempPath { file =>
spark.range(0, 10).write
- .option("orc.compress", "LZO")
+ .option("compression", "LZO")
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression