From 361ebc282b2d09dc6dcf21419a53c5c617b1b6bd Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 25 May 2016 23:54:24 -0700 Subject: [SPARK-15543][SQL] Rename DefaultSources to make them more self-describing ## What changes were proposed in this pull request? This patch renames various DefaultSources to make their names more self-describing. The choice of "DefaultSource" was from the days when we did not have a good way to specify short names. They are now named: - LibSVMFileFormat - CSVFileFormat - JdbcRelationProvider - JsonFileFormat - ParquetFileFormat - TextFileFormat Backward compatibility is maintained through aliasing. ## How was this patch tested? Updated relevant test cases too. Author: Reynold Xin Closes #13311 from rxin/SPARK-15543. --- ...org.apache.spark.sql.sources.DataSourceRegister | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 18 +- .../apache/spark/sql/hive/orc/OrcFileFormat.scala | 375 +++++++++++++++++++++ .../apache/spark/sql/hive/orc/OrcRelation.scala | 371 -------------------- .../sql/hive/orc/OrcHadoopFsRelationSuite.scala | 2 +- 5 files changed, 386 insertions(+), 382 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala (limited to 'sql/hive/src') diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister index 4a774fbf1f..32aa13ff25 100644 --- a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister +++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -1 +1 @@ -org.apache.spark.sql.hive.orc.DefaultSource +org.apache.spark.sql.hive.orc.OrcFileFormat diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 86ab152402..b377a20e39 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -32,8 +32,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.command.CreateTableAsSelectLogicalPlan import org.apache.spark.sql.execution.datasources.{Partition => _, _} -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} -import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.hive.orc.OrcFileFormat import org.apache.spark.sql.types._ @@ -281,7 +281,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val inferredSchema = defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) inferredSchema.map { inferred => - ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) + ParquetFileFormat.mergeMetastoreParquetSchema(metastoreSchema, inferred) }.getOrElse(metastoreSchema) } else { defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get @@ -348,13 +348,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new ParquetDefaultSource() - val fileFormatClass = classOf[ParquetDefaultSource] + val defaultSource = new ParquetFileFormat() + val fileFormatClass = classOf[ParquetFileFormat] val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging val options = Map( - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, - ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + ParquetFileFormat.MERGE_SCHEMA -> mergeSchema.toString, + ParquetFileFormat.METASTORE_TABLE_NAME -> TableIdentifier( relation.tableName, Some(relation.databaseName) ).unquotedString @@ -400,8 +400,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new OrcDefaultSource() - val fileFormatClass = classOf[OrcDefaultSource] + val defaultSource = new OrcFileFormat() + val fileFormatClass = classOf[OrcFileFormat] val options = Map[String, String]() convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala new file mode 100644 index 0000000000..f1198179a0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.net.URI +import java.util.Properties + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.io.orc._ +import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} +import org.apache.hadoop.io.{NullWritable, Writable} +import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} + +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{HadoopRDD, RDD} +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} +import org.apache.spark.sql.sources.{Filter, _} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * [[FileFormat]] for reading ORC files. If this is moved or renamed, please update + * [[DataSource]]'s backwardCompatibilityMap. + */ +private[sql] class OrcFileFormat + extends FileFormat with DataSourceRegister with Serializable { + + override def shortName(): String = "orc" + + override def toString: String = "ORC" + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = { + OrcFileOperator.readSchema( + files.map(_.getPath.toUri.toString), + Some(sparkSession.sessionState.newHadoopConf()) + ) + } + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, + options: Map[String, String], + dataSchema: StructType): OutputWriterFactory = { + val orcOptions = new OrcOptions(options) + + val configuration = job.getConfiguration + + configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) + configuration match { + case conf: JobConf => + conf.setOutputFormat(classOf[OrcOutputFormat]) + case conf => + conf.setClass( + "mapred.output.format.class", + classOf[OrcOutputFormat], + classOf[MapRedOutputFormat[_, _]]) + } + + new OutputWriterFactory { + override def newInstance( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext): OutputWriter = { + new OrcOutputWriter(path, bucketId, dataSchema, context) + } + } + } + + override def buildReader( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { + if (sparkSession.sessionState.conf.orcFilterPushDown) { + // Sets pushed predicates + OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => + hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + } + } + + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + (file: PartitionedFile) => { + val conf = broadcastedHadoopConf.value.value + + // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this + // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file + // using the given physical schema. Instead, we simply return an empty iterator. + val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) + if (maybePhysicalSchema.isEmpty) { + Iterator.empty + } else { + val physicalSchema = maybePhysicalSchema.get + OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) + + val orcRecordReader = { + val job = Job.getInstance(conf) + FileInputFormat.setInputPaths(job, file.filePath) + + val fileSplit = new FileSplit( + new Path(new URI(file.filePath)), file.start, file.length, Array.empty + ) + // Custom OrcRecordReader is used to get + // ObjectInspector during recordReader creation itself and can + // avoid NameNode call in unwrapOrcStructs per file. + // Specifically would be helpful for partitioned datasets. + val orcReader = OrcFile.createReader( + new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) + new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) + } + + // Unwraps `OrcStruct`s to `UnsafeRow`s + OrcRelation.unwrapOrcStructs( + conf, + requiredSchema, + Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), + new RecordReaderIterator[OrcStruct](orcRecordReader)) + } + } + } +} + +private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) + extends HiveInspectors { + + def serialize(row: InternalRow): Writable = { + wrapOrcStruct(cachedOrcStruct, structOI, row) + serializer.serialize(cachedOrcStruct, structOI) + } + + private[this] val serializer = { + val table = new Properties() + table.setProperty("columns", dataSchema.fieldNames.mkString(",")) + table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":")) + + val serde = new OrcSerde + serde.initialize(conf, table) + serde + } + + // Object inspector converted from the schema of the relation to be serialized. + private[this] val structOI = { + val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString) + OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) + .asInstanceOf[SettableStructObjectInspector] + } + + private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] + + private[this] def wrapOrcStruct( + struct: OrcStruct, + oi: SettableStructObjectInspector, + row: InternalRow): Unit = { + val fieldRefs = oi.getAllStructFieldRefs + var i = 0 + while (i < fieldRefs.size) { + + oi.setStructFieldData( + struct, + fieldRefs.get(i), + wrap( + row.get(i, dataSchema(i).dataType), + fieldRefs.get(i).getFieldObjectInspector, + dataSchema(i).dataType)) + i += 1 + } + } +} + +private[orc] class OrcOutputWriter( + path: String, + bucketId: Option[Int], + dataSchema: StructType, + context: TaskAttemptContext) + extends OutputWriter { + + private[this] val conf = context.getConfiguration + + private[this] val serializer = new OrcSerializer(dataSchema, conf) + + // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this + // flag to decide whether `OrcRecordWriter.close()` needs to be called. + private var recordWriterInstantiated = false + + private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { + recordWriterInstantiated = true + val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") + val taskAttemptId = context.getTaskAttemptID + val partition = taskAttemptId.getTaskID.getId + val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") + val compressionExtension = { + val name = conf.get(OrcRelation.ORC_COMPRESSION) + OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") + } + // It has the `.orc` extension at the end because (de)compression tools + // such as gunzip would not be able to decompress this as the compression + // is not applied on this whole file but on each "stream" in ORC format. + val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc" + + new OrcOutputFormat().getRecordWriter( + new Path(path, filename).getFileSystem(conf), + conf.asInstanceOf[JobConf], + new Path(path, filename).toString, + Reporter.NULL + ).asInstanceOf[RecordWriter[NullWritable, Writable]] + } + + override def write(row: Row): Unit = + throw new UnsupportedOperationException("call writeInternal") + + override protected[sql] def writeInternal(row: InternalRow): Unit = { + recordWriter.write(NullWritable.get(), serializer.serialize(row)) + } + + override def close(): Unit = { + if (recordWriterInstantiated) { + recordWriter.close(Reporter.NULL) + } + } +} + +private[orc] case class OrcTableScan( + @transient sparkSession: SparkSession, + attributes: Seq[Attribute], + filters: Array[Filter], + @transient inputPaths: Seq[FileStatus]) + extends Logging + with HiveInspectors { + + def execute(): RDD[InternalRow] = { + val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) + val conf = job.getConfiguration + + // Figure out the actual schema from the ORC source (without partition columns) so that we + // can pick the correct ordinals. Note that this assumes that all files have the same schema. + val orcFormat = new OrcFileFormat + val dataSchema = + orcFormat + .inferSchema(sparkSession, Map.empty, inputPaths) + .getOrElse(sys.error("Failed to read schema from target ORC files.")) + + // Tries to push down filters if ORC filter push-down is enabled + if (sparkSession.sessionState.conf.orcFilterPushDown) { + OrcFilters.createFilter(dataSchema, filters).foreach { f => + conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) + conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) + } + } + + // Sets requested columns + OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) + + if (inputPaths.isEmpty) { + // the input path probably be pruned, return an empty RDD. + return sparkSession.sparkContext.emptyRDD[InternalRow] + } + FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) + + val inputFormatClass = + classOf[OrcInputFormat] + .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] + + val rdd = sparkSession.sparkContext.hadoopRDD( + conf.asInstanceOf[JobConf], + inputFormatClass, + classOf[NullWritable], + classOf[Writable] + ).asInstanceOf[HadoopRDD[NullWritable, Writable]] + + val wrappedConf = new SerializableConfiguration(conf) + + rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => + val writableIterator = iterator.map(_._2) + val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf)) + OrcRelation.unwrapOrcStructs( + wrappedConf.value, + StructType.fromAttributes(attributes), + maybeStructOI, + writableIterator + ) + } + } +} + +private[orc] object OrcTableScan { + // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. + private[orc] val SARG_PUSHDOWN = "sarg.pushdown" +} + +private[orc] object OrcRelation extends HiveInspectors { + // The references of Hive's classes will be minimized. + val ORC_COMPRESSION = "orc.compress" + + // The extensions for ORC compression codecs + val extensionsForCompressionCodecNames = Map( + "NONE" -> "", + "SNAPPY" -> ".snappy", + "ZLIB" -> ".zlib", + "LZO" -> ".lzo") + + def unwrapOrcStructs( + conf: Configuration, + dataSchema: StructType, + maybeStructOI: Option[StructObjectInspector], + iterator: Iterator[Writable]): Iterator[InternalRow] = { + val deserializer = new OrcSerde + val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType)) + val unsafeProjection = UnsafeProjection.create(dataSchema) + + def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = { + val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map { + case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal + }.unzip + + val unwrappers = fieldRefs.map(unwrapperFor) + + iterator.map { value => + val raw = deserializer.deserialize(value) + var i = 0 + while (i < fieldRefs.length) { + val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) + if (fieldValue == null) { + mutableRow.setNullAt(fieldOrdinals(i)) + } else { + unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) + } + i += 1 + } + unsafeProjection(mutableRow) + } + } + + maybeStructOI.map(unwrap).getOrElse(Iterator.empty) + } + + def setRequiredColumns( + conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { + val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) + val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala deleted file mode 100644 index 38f50c112a..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ /dev/null @@ -1,371 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.orc - -import java.net.URI -import java.util.Properties - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, Path} -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.ql.io.orc._ -import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector} -import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils} -import org.apache.hadoop.io.{NullWritable, Writable} -import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} - -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.{HadoopRDD, RDD} -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} -import org.apache.spark.sql.sources.{Filter, _} -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration - -private[sql] class DefaultSource - extends FileFormat with DataSourceRegister with Serializable { - - override def shortName(): String = "orc" - - override def toString: String = "ORC" - - override def inferSchema( - sparkSession: SparkSession, - options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = { - OrcFileOperator.readSchema( - files.map(_.getPath.toUri.toString), - Some(sparkSession.sessionState.newHadoopConf()) - ) - } - - override def prepareWrite( - sparkSession: SparkSession, - job: Job, - options: Map[String, String], - dataSchema: StructType): OutputWriterFactory = { - val orcOptions = new OrcOptions(options) - - val configuration = job.getConfiguration - - configuration.set(OrcRelation.ORC_COMPRESSION, orcOptions.compressionCodec) - configuration match { - case conf: JobConf => - conf.setOutputFormat(classOf[OrcOutputFormat]) - case conf => - conf.setClass( - "mapred.output.format.class", - classOf[OrcOutputFormat], - classOf[MapRedOutputFormat[_, _]]) - } - - new OutputWriterFactory { - override def newInstance( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext): OutputWriter = { - new OrcOutputWriter(path, bucketId, dataSchema, context) - } - } - } - - override def buildReader( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - if (sparkSession.sessionState.conf.orcFilterPushDown) { - // Sets pushed predicates - OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => - hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) - } - } - - val broadcastedHadoopConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) - - (file: PartitionedFile) => { - val conf = broadcastedHadoopConf.value.value - - // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this - // case, `OrcFileOperator.readSchema` returns `None`, and we can't read the underlying file - // using the given physical schema. Instead, we simply return an empty iterator. - val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf)) - if (maybePhysicalSchema.isEmpty) { - Iterator.empty - } else { - val physicalSchema = maybePhysicalSchema.get - OrcRelation.setRequiredColumns(conf, physicalSchema, requiredSchema) - - val orcRecordReader = { - val job = Job.getInstance(conf) - FileInputFormat.setInputPaths(job, file.filePath) - - val fileSplit = new FileSplit( - new Path(new URI(file.filePath)), file.start, file.length, Array.empty - ) - // Custom OrcRecordReader is used to get - // ObjectInspector during recordReader creation itself and can - // avoid NameNode call in unwrapOrcStructs per file. - // Specifically would be helpful for partitioned datasets. - val orcReader = OrcFile.createReader( - new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) - new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart, fileSplit.getLength) - } - - // Unwraps `OrcStruct`s to `UnsafeRow`s - OrcRelation.unwrapOrcStructs( - conf, - requiredSchema, - Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), - new RecordReaderIterator[OrcStruct](orcRecordReader)) - } - } - } -} - -private[orc] class OrcSerializer(dataSchema: StructType, conf: Configuration) - extends HiveInspectors { - - def serialize(row: InternalRow): Writable = { - wrapOrcStruct(cachedOrcStruct, structOI, row) - serializer.serialize(cachedOrcStruct, structOI) - } - - private[this] val serializer = { - val table = new Properties() - table.setProperty("columns", dataSchema.fieldNames.mkString(",")) - table.setProperty("columns.types", dataSchema.map(_.dataType.catalogString).mkString(":")) - - val serde = new OrcSerde - serde.initialize(conf, table) - serde - } - - // Object inspector converted from the schema of the relation to be serialized. - private[this] val structOI = { - val typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(dataSchema.catalogString) - OrcStruct.createObjectInspector(typeInfo.asInstanceOf[StructTypeInfo]) - .asInstanceOf[SettableStructObjectInspector] - } - - private[this] val cachedOrcStruct = structOI.create().asInstanceOf[OrcStruct] - - private[this] def wrapOrcStruct( - struct: OrcStruct, - oi: SettableStructObjectInspector, - row: InternalRow): Unit = { - val fieldRefs = oi.getAllStructFieldRefs - var i = 0 - while (i < fieldRefs.size) { - - oi.setStructFieldData( - struct, - fieldRefs.get(i), - wrap( - row.get(i, dataSchema(i).dataType), - fieldRefs.get(i).getFieldObjectInspector, - dataSchema(i).dataType)) - i += 1 - } - } -} - -private[orc] class OrcOutputWriter( - path: String, - bucketId: Option[Int], - dataSchema: StructType, - context: TaskAttemptContext) - extends OutputWriter { - - private[this] val conf = context.getConfiguration - - private[this] val serializer = new OrcSerializer(dataSchema, conf) - - // `OrcRecordWriter.close()` creates an empty file if no rows are written at all. We use this - // flag to decide whether `OrcRecordWriter.close()` needs to be called. - private var recordWriterInstantiated = false - - private lazy val recordWriter: RecordWriter[NullWritable, Writable] = { - recordWriterInstantiated = true - val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID") - val taskAttemptId = context.getTaskAttemptID - val partition = taskAttemptId.getTaskID.getId - val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("") - val compressionExtension = { - val name = conf.get(OrcRelation.ORC_COMPRESSION) - OrcRelation.extensionsForCompressionCodecNames.getOrElse(name, "") - } - // It has the `.orc` extension at the end because (de)compression tools - // such as gunzip would not be able to decompress this as the compression - // is not applied on this whole file but on each "stream" in ORC format. - val filename = f"part-r-$partition%05d-$uniqueWriteJobId$bucketString$compressionExtension.orc" - - new OrcOutputFormat().getRecordWriter( - new Path(path, filename).getFileSystem(conf), - conf.asInstanceOf[JobConf], - new Path(path, filename).toString, - Reporter.NULL - ).asInstanceOf[RecordWriter[NullWritable, Writable]] - } - - override def write(row: Row): Unit = - throw new UnsupportedOperationException("call writeInternal") - - override protected[sql] def writeInternal(row: InternalRow): Unit = { - recordWriter.write(NullWritable.get(), serializer.serialize(row)) - } - - override def close(): Unit = { - if (recordWriterInstantiated) { - recordWriter.close(Reporter.NULL) - } - } -} - -private[orc] case class OrcTableScan( - @transient sparkSession: SparkSession, - attributes: Seq[Attribute], - filters: Array[Filter], - @transient inputPaths: Seq[FileStatus]) - extends Logging - with HiveInspectors { - - def execute(): RDD[InternalRow] = { - val job = Job.getInstance(sparkSession.sessionState.newHadoopConf()) - val conf = job.getConfiguration - - // Figure out the actual schema from the ORC source (without partition columns) so that we - // can pick the correct ordinals. Note that this assumes that all files have the same schema. - val orcFormat = new DefaultSource - val dataSchema = - orcFormat - .inferSchema(sparkSession, Map.empty, inputPaths) - .getOrElse(sys.error("Failed to read schema from target ORC files.")) - - // Tries to push down filters if ORC filter push-down is enabled - if (sparkSession.sessionState.conf.orcFilterPushDown) { - OrcFilters.createFilter(dataSchema, filters).foreach { f => - conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) - conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) - } - } - - // Sets requested columns - OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) - - if (inputPaths.isEmpty) { - // the input path probably be pruned, return an empty RDD. - return sparkSession.sparkContext.emptyRDD[InternalRow] - } - FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) - - val inputFormatClass = - classOf[OrcInputFormat] - .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] - - val rdd = sparkSession.sparkContext.hadoopRDD( - conf.asInstanceOf[JobConf], - inputFormatClass, - classOf[NullWritable], - classOf[Writable] - ).asInstanceOf[HadoopRDD[NullWritable, Writable]] - - val wrappedConf = new SerializableConfiguration(conf) - - rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => - val writableIterator = iterator.map(_._2) - val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf)) - OrcRelation.unwrapOrcStructs( - wrappedConf.value, - StructType.fromAttributes(attributes), - maybeStructOI, - writableIterator - ) - } - } -} - -private[orc] object OrcTableScan { - // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. - private[orc] val SARG_PUSHDOWN = "sarg.pushdown" -} - -private[orc] object OrcRelation extends HiveInspectors { - // The references of Hive's classes will be minimized. - val ORC_COMPRESSION = "orc.compress" - - // The extensions for ORC compression codecs - val extensionsForCompressionCodecNames = Map( - "NONE" -> "", - "SNAPPY" -> ".snappy", - "ZLIB" -> ".zlib", - "LZO" -> ".lzo") - - def unwrapOrcStructs( - conf: Configuration, - dataSchema: StructType, - maybeStructOI: Option[StructObjectInspector], - iterator: Iterator[Writable]): Iterator[InternalRow] = { - val deserializer = new OrcSerde - val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType)) - val unsafeProjection = UnsafeProjection.create(dataSchema) - - def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = { - val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map { - case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal - }.unzip - - val unwrappers = fieldRefs.map(unwrapperFor) - - iterator.map { value => - val raw = deserializer.deserialize(value) - var i = 0 - while (i < fieldRefs.length) { - val fieldValue = oi.getStructFieldData(raw, fieldRefs(i)) - if (fieldValue == null) { - mutableRow.setNullAt(fieldOrdinals(i)) - } else { - unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) - } - i += 1 - } - unsafeProjection(mutableRow) - } - } - - maybeStructOI.map(unwrap).getOrElse(Iterator.empty) - } - - def setRequiredColumns( - conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = { - val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer) - val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip - HiveShim.appendReadColumns(conf, sortedIDs, sortedNames) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 0207b4e8c9..5dfa58f673 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { import testImplicits._ - override val dataSourceName: String = classOf[DefaultSource].getCanonicalName + override val dataSourceName: String = classOf[OrcFileFormat].getCanonicalName // ORC does not play well with NullType and UDT. override protected def supportsDataType(dataType: DataType): Boolean = dataType match { -- cgit v1.2.3