aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-26 16:10:35 -0700
committerYin Huai <yhuai@databricks.com>2016-03-26 16:10:35 -0700
commitb547de8a60074ca25c5bec3a24511f8042bdf0ad (patch)
tree4614553f2a85f50736709afe95b4f9d9bec3f3f9 /sql/hive
parent8989d3a39657e817918fb4e5fdab172b68b85df6 (diff)
downloadspark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.gz
spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.tar.bz2
spark-b547de8a60074ca25c5bec3a24511f8042bdf0ad.zip
[SPARK-14116][SQL] Implements buildReader() for ORC data source
## What changes were proposed in this pull request? This PR implements `FileFormat.buildReader()` for our ORC data source. It also fixed several minor styling issues related to `HadoopFsRelation` planning code path. Note that `OrcNewInputFormat` doesn't rely on `OrcNewSplit` for creating `OrcRecordReader`s, plain `FileSplit` is just fine. That's why we can simply create the record reader with the help of `OrcNewInputFormat` and `FileSplit`. ## How was this patch tested? Existing test cases should do the work Author: Cheng Lian <lian@databricks.com> Closes #11936 from liancheng/spark-14116-build-reader-for-orc.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala190
2 files changed, 128 insertions, 64 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 0195466946..8248a112a0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveMetastoreTypes
import org.apache.spark.sql.types.StructType
@@ -92,7 +91,6 @@ private[orc] object OrcFileOperator extends Logging {
// TODO: Check if the paths coming in are already qualified and simplify.
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
- val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
.filterNot(_.isDirectory)
.map(_.getPath)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index b5dc9106e2..7c4a0a0c0f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hive.orc
+import java.net.URI
import java.util.Properties
import org.apache.hadoop.conf.Configuration
@@ -24,12 +25,13 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.io.orc._
import org.apache.hadoop.hive.ql.io.orc.OrcFile.OrcTableProperties
-import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.{SettableStructObjectInspector, StructObjectInspector}
import org.apache.hadoop.hive.serde2.typeinfo.{StructTypeInfo, TypeInfoUtils}
import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
-import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
@@ -37,6 +39,7 @@ import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.{HiveInspectors, HiveMetastoreTypes, HiveShim}
import org.apache.spark.sql.sources.{Filter, _}
@@ -44,7 +47,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection.BitSet
-private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
+private[sql] class DefaultSource
+ extends FileFormat with DataSourceRegister with Serializable {
override def shortName(): String = "orc"
@@ -55,7 +59,9 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
- files.map(_.getPath.toUri.toString), Some(sqlContext.sparkContext.hadoopConfiguration))
+ files.map(_.getPath.toUri.toString),
+ Some(sqlContext.sparkContext.hadoopConfiguration)
+ )
}
override def prepareWrite(
@@ -80,8 +86,8 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
job.getConfiguration.set(
OrcTableProperties.COMPRESSION.getPropName,
OrcRelation
- .shortOrcCompressionCodecNames
- .getOrElse(codecName, CompressionKind.NONE).name())
+ .shortOrcCompressionCodecNames
+ .getOrElse(codecName, CompressionKind.NONE).name())
}
job.getConfiguration match {
@@ -117,6 +123,68 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister {
val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes
OrcTableScan(sqlContext, output, filters, inputFiles).execute()
}
+
+ override def buildReader(
+ sqlContext: SQLContext,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+
+ if (sqlContext.conf.orcFilterPushDown) {
+ // Sets pushed predicates
+ OrcFilters.createFilter(filters.toArray).foreach { f =>
+ orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
+ orcConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
+ }
+ }
+
+ val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf))
+
+ (file: PartitionedFile) => {
+ val conf = broadcastedConf.value.value
+
+ // SPARK-8501: Empty ORC files always have an empty schema stored in their footer. In this
+ // case, `OrcFileOperator.readSchema` returns `None`, and we can simply return an empty
+ // iterator.
+ val maybePhysicalSchema = OrcFileOperator.readSchema(Seq(file.filePath), Some(conf))
+ if (maybePhysicalSchema.isEmpty) {
+ Iterator.empty
+ } else {
+ val physicalSchema = maybePhysicalSchema.get
+ OrcRelation.setRequiredColumns(conf, physicalSchema, dataSchema)
+
+ val orcRecordReader = {
+ val job = Job.getInstance(conf)
+ FileInputFormat.setInputPaths(job, file.filePath)
+
+ val inputFormat = new OrcNewInputFormat
+ val fileSplit = new FileSplit(
+ new Path(new URI(file.filePath)), file.start, file.length, Array.empty
+ )
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
+ inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
+ }
+
+ // Unwraps `OrcStruct`s to `UnsafeRow`s
+ val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
+ file.filePath, conf, dataSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
+ )
+
+ // Appends partition values
+ val fullOutput = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullOutput, fullOutput)
+
+ unsafeRowIterator.map { dataRow =>
+ appendPartitionColumns(joinedRow(dataRow, file.partitionValues))
+ }
+ }
+ }
+ }
}
private[orc] class OrcOutputWriter(
@@ -225,55 +293,6 @@ private[orc] case class OrcTableScan(
extends Logging
with HiveInspectors {
- private def addColumnIds(
- dataSchema: StructType,
- output: Seq[Attribute],
- conf: Configuration): Unit = {
- val ids = output.map(a => dataSchema.fieldIndex(a.name): Integer)
- val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip
- HiveShim.appendReadColumns(conf, sortedIds, sortedNames)
- }
-
- // Transform all given raw `Writable`s into `InternalRow`s.
- private def fillObject(
- path: String,
- conf: Configuration,
- iterator: Iterator[Writable],
- nonPartitionKeyAttrs: Seq[Attribute]): Iterator[InternalRow] = {
- val deserializer = new OrcSerde
- val maybeStructOI = OrcFileOperator.getObjectInspector(path, Some(conf))
- val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
- val unsafeProjection = UnsafeProjection.create(StructType.fromAttributes(nonPartitionKeyAttrs))
-
- // SPARK-8501: ORC writes an empty schema ("struct<>") to an ORC file if the file contains zero
- // rows, and thus couldn't give a proper ObjectInspector. In this case we just return an empty
- // partition since we know that this file is empty.
- maybeStructOI.map { soi =>
- val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.zipWithIndex.map {
- case (attr, ordinal) =>
- soi.getStructFieldRef(attr.name) -> ordinal
- }.unzip
- val unwrappers = fieldRefs.map(unwrapperFor)
- // Map each tuple to a row object
- iterator.map { value =>
- val raw = deserializer.deserialize(value)
- var i = 0
- while (i < fieldRefs.length) {
- val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
- if (fieldValue == null) {
- mutableRow.setNullAt(fieldOrdinals(i))
- } else {
- unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
- }
- i += 1
- }
- unsafeProjection(mutableRow)
- }
- }.getOrElse {
- Iterator.empty
- }
- }
-
def execute(): RDD[InternalRow] = {
val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
val conf = job.getConfiguration
@@ -291,10 +310,10 @@ private[orc] case class OrcTableScan(
val orcFormat = new DefaultSource
val dataSchema =
orcFormat
- .inferSchema(sqlContext, Map.empty, inputPaths)
- .getOrElse(sys.error("Failed to read schema from target ORC files."))
+ .inferSchema(sqlContext, Map.empty, inputPaths)
+ .getOrElse(sys.error("Failed to read schema from target ORC files."))
// Sets requested columns
- addColumnIds(dataSchema, attributes, conf)
+ OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))
if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
@@ -317,7 +336,12 @@ private[orc] case class OrcTableScan(
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
- fillObject(split.getPath.toString, wrappedConf.value, writableIterator, attributes)
+ OrcRelation.unwrapOrcStructs(
+ split.getPath.toString,
+ wrappedConf.value,
+ StructType.fromAttributes(attributes),
+ writableIterator
+ )
}
}
}
@@ -327,7 +351,7 @@ private[orc] object OrcTableScan {
private[orc] val SARG_PUSHDOWN = "sarg.pushdown"
}
-private[orc] object OrcRelation {
+private[orc] object OrcRelation extends HiveInspectors {
// The ORC compression short names
val shortOrcCompressionCodecNames = Map(
"none" -> CompressionKind.NONE,
@@ -343,5 +367,47 @@ private[orc] object OrcRelation {
CompressionKind.ZLIB.name -> ".zlib",
CompressionKind.LZO.name -> ".lzo"
)
-}
+ def unwrapOrcStructs(
+ filePath: String,
+ conf: Configuration,
+ dataSchema: StructType,
+ iterator: Iterator[Writable]): Iterator[InternalRow] = {
+ val deserializer = new OrcSerde
+ val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf))
+ val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
+ val unsafeProjection = UnsafeProjection.create(dataSchema)
+
+ def unwrap(oi: StructObjectInspector): Iterator[InternalRow] = {
+ val (fieldRefs, fieldOrdinals) = dataSchema.zipWithIndex.map {
+ case (field, ordinal) => oi.getStructFieldRef(field.name) -> ordinal
+ }.unzip
+
+ val unwrappers = fieldRefs.map(unwrapperFor)
+
+ iterator.map { value =>
+ val raw = deserializer.deserialize(value)
+ var i = 0
+ while (i < fieldRefs.length) {
+ val fieldValue = oi.getStructFieldData(raw, fieldRefs(i))
+ if (fieldValue == null) {
+ mutableRow.setNullAt(fieldOrdinals(i))
+ } else {
+ unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ }
+ i += 1
+ }
+ unsafeProjection(mutableRow)
+ }
+ }
+
+ maybeStructOI.map(unwrap).getOrElse(Iterator.empty)
+ }
+
+ def setRequiredColumns(
+ conf: Configuration, physicalSchema: StructType, requestedSchema: StructType): Unit = {
+ val ids = requestedSchema.map(a => physicalSchema.fieldIndex(a.name): Integer)
+ val (sortedIDs, sortedNames) = ids.zip(requestedSchema.fieldNames).sorted.unzip
+ HiveShim.appendReadColumns(conf, sortedIDs, sortedNames)
+ }
+}