diff options
Diffstat (limited to 'sql')
7 files changed, 134 insertions, 62 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala index 6ff50a3c61..6609e5dee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala @@ -30,30 +30,26 @@ class DefaultSource extends RelationProvider with DataSourceRegister { override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) - val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) - val partitionColumn = parameters.getOrElse("partitionColumn", null) - val lowerBound = parameters.getOrElse("lowerBound", null) - val upperBound = parameters.getOrElse("upperBound", null) - val numPartitions = parameters.getOrElse("numPartitions", null) - - if (partitionColumn != null - && (lowerBound == null || upperBound == null || numPartitions == null)) { + val jdbcOptions = new JDBCOptions(parameters) + if (jdbcOptions.partitionColumn != null + && (jdbcOptions.lowerBound == null + || jdbcOptions.upperBound == null + || jdbcOptions.numPartitions == null)) { sys.error("Partitioning incompletely specified") } - val partitionInfo = if (partitionColumn == null) { + val partitionInfo = if (jdbcOptions.partitionColumn == null) { null } else { JDBCPartitioningInfo( - partitionColumn, - lowerBound.toLong, - upperBound.toLong, - numPartitions.toInt) + jdbcOptions.partitionColumn, + jdbcOptions.lowerBound.toLong, + jdbcOptions.upperBound.toLong, + jdbcOptions.numPartitions.toInt) } val parts = JDBCRelation.columnPartition(partitionInfo) val properties = new Properties() // Additional properties that we will pass to getConnection parameters.foreach(kv => properties.setProperty(kv._1, kv._2)) - JDBCRelation(url, table, parts, properties)(sqlContext.sparkSession) + JDBCRelation(jdbcOptions.url, jdbcOptions.table, parts, properties)(sqlContext.sparkSession) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala new file mode 100644 index 0000000000..6c6ec89746 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.jdbc + +/** + * Options for the JDBC data source. + */ +private[jdbc] class JDBCOptions( + @transient private val parameters: Map[String, String]) + extends Serializable { + + // a JDBC URL + val url = parameters.getOrElse("url", sys.error("Option 'url' not specified")) + // name of table + val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified")) + // the column used to partition + val partitionColumn = parameters.getOrElse("partitionColumn", null) + // the lower bound of partition column + val lowerBound = parameters.getOrElse("lowerBound", null) + // the upper bound of the partition column + val upperBound = parameters.getOrElse("upperBound", null) + // the number of partitions + val numPartitions = parameters.getOrElse("numPartitions", null) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index 00352f23ae..1ff217cbf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -19,16 +19,15 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -class ParquetOptions( +private[parquet] class ParquetOptions( @transient private val parameters: Map[String, String], @transient private val sqlConf: SQLConf) - extends Logging with Serializable { + extends Serializable { import ParquetOptions._ @@ -48,7 +47,7 @@ class ParquetOptions( } -object ParquetOptions { +private[parquet] object ParquetOptions { // The parquet compression short names private val shortParquetCompressionCodecNames = Map( "none" -> CompressionCodecName.UNCOMPRESSED, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala new file mode 100644 index 0000000000..91cf0dc960 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +/** + * Options for the ORC data source. + */ +private[orc] class OrcOptions( + @transient private val parameters: Map[String, String]) + extends Serializable { + + import OrcOptions._ + + /** + * Compression codec to use. By default snappy compression. + * Acceptable values are defined in [[shortOrcCompressionCodecNames]]. + */ + val compressionCodec: String = { + val codecName = parameters.getOrElse("compression", "snappy").toLowerCase + if (!shortOrcCompressionCodecNames.contains(codecName)) { + val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) + throw new IllegalArgumentException(s"Codec [$codecName] " + + s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") + } + shortOrcCompressionCodecNames(codecName) + } +} + +private[orc] object OrcOptions { + // The ORC compression short names + private val shortOrcCompressionCodecNames = Map( + "none" -> "NONE", + "uncompressed" -> "NONE", + "snappy" -> "SNAPPY", + "zlib" -> "ZLIB", + "lzo" -> "LZO") +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index fed3150304..6e55137dd7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc._ -import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} import org.apache.hadoop.io.{NullWritable, Writable} @@ -37,7 +36,6 @@ import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} import org.apache.spark.sql.sources.{Filter, _} @@ -66,28 +64,12 @@ private[sql] class DefaultSource job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { - val compressionCodec: Option[String] = options - .get("compression") - .map { codecName => - // Validate if given compression codec is supported or not. - val shortOrcCompressionCodecNames = OrcRelation.shortOrcCompressionCodecNames - if (!shortOrcCompressionCodecNames.contains(codecName.toLowerCase)) { - val availableCodecs = shortOrcCompressionCodecNames.keys.map(_.toLowerCase) - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Available codecs are ${availableCodecs.mkString(", ")}.") - } - codecName.toLowerCase - } + val orcOptions = new OrcOptions(options) - compressionCodec.foreach { codecName => - job.getConfiguration.set( - OrcTableProperties.COMPRESSION.getPropName, - OrcRelation - .shortOrcCompressionCodecNames - .getOrElse(codecName, CompressionKind.NONE).name()) - } + val configuration = job.getConfiguration - job.getConfiguration match { + configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) + configuration match { case conf: JobConf => conf.setOutputFormat(classOf[OrcOutputFormat]) case conf => @@ -205,7 +187,7 @@ private[orc] class OrcOutputWriter( val partition = taskAttemptId.getTaskID.getId val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") val compressionExtension = { - val name = conf.get(OrcTableProperties.COMPRESSION.getPropName) + val name = conf.get(OrcRelation.ORC_COMPRESSION) OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") } // It has the `.orc` extension at the end because (de)compression tools @@ -329,21 +311,15 @@ private[orc] object OrcTableScan { } private[orc] object OrcRelation extends HiveInspectors { - // The ORC compression short names - val shortOrcCompressionCodecNames = Map( - "none" -> CompressionKind.NONE, - "uncompressed" -> CompressionKind.NONE, - "snappy" -> CompressionKind.SNAPPY, - "zlib" -> CompressionKind.ZLIB, - "lzo" -> CompressionKind.LZO) + // The references of Hive's classes will be minimized. + val ORC_COMPRESSION = "orc.compress" // The extensions for ORC compression codecs val extensionsForCompressionCodecNames = Map( - CompressionKind.NONE.name -> "", - CompressionKind.SNAPPY.name -> ".snappy", - CompressionKind.ZLIB.name -> ".zlib", - CompressionKind.LZO.name -> ".lzo" - ) + "NONE" -> "", + "SNAPPY" -> ".snappy", + "ZLIB" -> ".zlib", + "LZO" -> ".lzo") def unwrapOrcStructs( conf: Configuration, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 965680ff0d..0207b4e8c9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.orc import java.io.File import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.ql.io.orc.{CompressionKind, OrcFile} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.Row @@ -98,9 +97,10 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { val fs = FileSystem.getLocal(conf) val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc")) assert(maybeOrcFile.isDefined) - val orcFilePath = new Path(maybeOrcFile.get.toPath.toString) - val orcReader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)) - assert(orcReader.getCompression == CompressionKind.ZLIB) + val orcFilePath = maybeOrcFile.get.toPath.toString + val expectedCompressionKind = + OrcFileOperator.getFileReader(orcFilePath).get.getCompression + assert("ZLIB" === expectedCompressionKind.name()) val copyDf = spark .read @@ -108,4 +108,14 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { checkAnswer(df, copyDf) } } + + test("Default compression codec is snappy for ORC compression") { + withTempPath { file => + spark.range(0, 10).write + .orc(file.getCanonicalPath) + val expectedCompressionKind = + OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression + assert("SNAPPY" === expectedCompressionKind.name()) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 084546f99d..9a0885822b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -171,7 +171,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("Compression options for writing to an ORC file (SNAPPY, ZLIB and NONE)") { withTempPath { file => spark.range(0, 10).write - .option("orc.compress", "ZLIB") + .option("compression", "ZLIB") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression @@ -180,7 +180,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { file => spark.range(0, 10).write - .option("orc.compress", "SNAPPY") + .option("compression", "SNAPPY") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression @@ -189,7 +189,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { withTempPath { file => spark.range(0, 10).write - .option("orc.compress", "NONE") + .option("compression", "NONE") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression @@ -201,7 +201,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { ignore("LZO compression options for writing to an ORC file not supported in Hive 1.2.1") { withTempPath { file => spark.range(0, 10).write - .option("orc.compress", "LZO") + .option("compression", "LZO") .orc(file.getCanonicalPath) val expectedCompressionKind = OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression |