aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorRajesh Balamohan <rbalamohan@apache.org>2016-04-22 22:51:40 -0700
committerReynold Xin <rxin@databricks.com>2016-04-22 22:51:40 -0700
commite5226e3007d6645c6d48d3c1b2762566184f3fc7 (patch)
tree3fc4e6ea679ee8fb87d7d1aa97da2462199ecc4b /sql
parent95faa731c15ce2e36373071a405207165818df97 (diff)
downloadspark-e5226e3007d6645c6d48d3c1b2762566184f3fc7.tar.gz
spark-e5226e3007d6645c6d48d3c1b2762566184f3fc7.tar.bz2
spark-e5226e3007d6645c6d48d3c1b2762566184f3fc7.zip
[SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation
## What changes were proposed in this pull request? When FileSourceStrategy is used, record reader is created which incurs a NN call internally. Later in OrcRelation.unwrapOrcStructs, it ends ups reading the file information to get the ObjectInspector. This incurs additional NN call. It would be good to avoid this additional NN call (specifically for partitioned datasets). Added OrcRecordReader which is very similar to OrcNewInputFormat.OrcRecordReader with an option of exposing the ObjectInspector. This eliminates the need to look up the file later for generating the object inspector. This would be specifically be useful for partitioned tables/datasets. ## How was this patch tested? Ran tpc-ds queries manually and also verified by running org.apache.spark.sql.hive.orc.OrcSuite,org.apache.spark.sql.hive.orc.OrcQuerySuite,org.apache.spark.sql.hive.orc.OrcPartitionDiscoverySuite,OrcPartitionDiscoverySuite.OrcHadoopFsRelationSuite,org.apache.spark.sql.hive.execution.HiveCompatibilitySuite …SourceStrategy mode Author: Rajesh Balamohan <rbalamohan@apache.org> Closes #12319 from rajeshbalamohan/SPARK-14551.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java94
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala25
2 files changed, 108 insertions, 11 deletions
diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
new file mode 100644
index 0000000000..f093637d41
--- /dev/null
+++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This is based on hive-exec-1.2.1
+ * {@link org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat.OrcRecordReader}.
+ * This class exposes getObjectInspector which can be used for reducing
+ * NameNode calls in OrcRelation.
+ */
+public class SparkOrcNewRecordReader extends
+ org.apache.hadoop.mapreduce.RecordReader<NullWritable, OrcStruct> {
+ private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+ private final int numColumns;
+ OrcStruct value;
+ private float progress = 0.0f;
+ private ObjectInspector objectInspector;
+
+ public SparkOrcNewRecordReader(Reader file, Configuration conf,
+ long offset, long length) throws IOException {
+ List<OrcProto.Type> types = file.getTypes();
+ numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+ value = new OrcStruct(numColumns);
+ this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset,
+ length);
+ this.objectInspector = file.getObjectInspector();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException,
+ InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public OrcStruct getCurrentValue() throws IOException,
+ InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return progress;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (reader.hasNext()) {
+ reader.next(value);
+ progress = reader.getProgress();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public ObjectInspector getObjectInspector() {
+ return objectInspector;
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 21591ec093..b0f32faa5c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -31,7 +31,6 @@ import org.apache.hadoop.io.{NullWritable, Writable}
import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
@@ -145,20 +144,24 @@ private[sql] class DefaultSource
val job = Job.getInstance(conf)
FileInputFormat.setInputPaths(job, file.filePath)
- val inputFormat = new OrcNewInputFormat
val fileSplit = new FileSplit(
new Path(new URI(file.filePath)), file.start, file.length, Array.empty
)
-
- val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- inputFormat.createRecordReader(fileSplit, hadoopAttemptContext)
+ // Custom OrcRecordReader is used to get
+ // ObjectInspector during recordReader creation itself and can
+ // avoid NameNode call in unwrapOrcStructs per file.
+ // Specifically would be helpful for partitioned datasets.
+ val orcReader = OrcFile.createReader(
+ new Path(new URI(file.filePath)), OrcFile.readerOptions(conf))
+ new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength())
}
// Unwraps `OrcStruct`s to `UnsafeRow`s
val unsafeRowIterator = OrcRelation.unwrapOrcStructs(
- file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader)
- )
+ conf,
+ requiredSchema,
+ Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]),
+ new RecordReaderIterator[OrcStruct](orcRecordReader))
// Appends partition values
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes
@@ -322,10 +325,11 @@ private[orc] case class OrcTableScan(
rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) =>
val writableIterator = iterator.map(_._2)
+ val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf))
OrcRelation.unwrapOrcStructs(
- split.getPath.toString,
wrappedConf.value,
StructType.fromAttributes(attributes),
+ maybeStructOI,
writableIterator
)
}
@@ -355,12 +359,11 @@ private[orc] object OrcRelation extends HiveInspectors {
)
def unwrapOrcStructs(
- filePath: String,
conf: Configuration,
dataSchema: StructType,
+ maybeStructOI: Option[StructObjectInspector],
iterator: Iterator[Writable]): Iterator[InternalRow] = {
val deserializer = new OrcSerde
- val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf))
val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType))
val unsafeProjection = UnsafeProjection.create(dataSchema)