From e5226e3007d6645c6d48d3c1b2762566184f3fc7 Mon Sep 17 00:00:00 2001 From: Rajesh Balamohan Date: Fri, 22 Apr 2016 22:51:40 -0700 Subject: [SPARK-14551][SQL] Reduce number of NameNode calls in OrcRelation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? When FileSourceStrategy is used, record reader is created which incurs a NN call internally. Later in OrcRelation.unwrapOrcStructs, it ends ups reading the file information to get the ObjectInspector. This incurs additional NN call. It would be good to avoid this additional NN call (specifically for partitioned datasets). Added OrcRecordReader which is very similar to OrcNewInputFormat.OrcRecordReader with an option of exposing the ObjectInspector. This eliminates the need to look up the file later for generating the object inspector. This would be specifically be useful for partitioned tables/datasets. ## How was this patch tested? Ran tpc-ds queries manually and also verified by running org.apache.spark.sql.hive.orc.OrcSuite,org.apache.spark.sql.hive.orc.OrcQuerySuite,org.apache.spark.sql.hive.orc.OrcPartitionDiscoverySuite,OrcPartitionDiscoverySuite.OrcHadoopFsRelationSuite,org.apache.spark.sql.hive.execution.HiveCompatibilitySuite …SourceStrategy mode Author: Rajesh Balamohan Closes #12319 from rajeshbalamohan/SPARK-14551. --- .../hive/ql/io/orc/SparkOrcNewRecordReader.java | 94 ++++++++++++++++++++++ .../apache/spark/sql/hive/orc/OrcRelation.scala | 25 +++--- 2 files changed, 108 insertions(+), 11 deletions(-) create mode 100644 sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java (limited to 'sql') diff --git a/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java new file mode 100644 index 0000000000..f093637d41 --- /dev/null +++ b/sql/hive/src/main/java/org/apache/hadoop/hive/ql/io/orc/SparkOrcNewRecordReader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +/** + * This is based on hive-exec-1.2.1 + * {@link org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat.OrcRecordReader}. + * This class exposes getObjectInspector which can be used for reducing + * NameNode calls in OrcRelation. + */ +public class SparkOrcNewRecordReader extends + org.apache.hadoop.mapreduce.RecordReader { + private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader; + private final int numColumns; + OrcStruct value; + private float progress = 0.0f; + private ObjectInspector objectInspector; + + public SparkOrcNewRecordReader(Reader file, Configuration conf, + long offset, long length) throws IOException { + List types = file.getTypes(); + numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount(); + value = new OrcStruct(numColumns); + this.reader = OrcInputFormat.createReaderFromFile(file, conf, offset, + length); + this.objectInspector = file.getObjectInspector(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public NullWritable getCurrentKey() throws IOException, + InterruptedException { + return NullWritable.get(); + } + + @Override + public OrcStruct getCurrentValue() throws IOException, + InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return progress; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (reader.hasNext()) { + reader.next(value); + progress = reader.getProgress(); + return true; + } else { + return false; + } + } + + public ObjectInspector getObjectInspector() { + return objectInspector; + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 21591ec093..b0f32faa5c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.io.{NullWritable, Writable} import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter} import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} @@ -145,20 +144,24 @@ private[sql] class DefaultSource val job = Job.getInstance(conf) FileInputFormat.setInputPaths(job, file.filePath) - val inputFormat = new OrcNewInputFormat val fileSplit = new FileSplit( new Path(new URI(file.filePath)), file.start, file.length, Array.empty ) - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - inputFormat.createRecordReader(fileSplit, hadoopAttemptContext) + // Custom OrcRecordReader is used to get + // ObjectInspector during recordReader creation itself and can + // avoid NameNode call in unwrapOrcStructs per file. + // Specifically would be helpful for partitioned datasets. + val orcReader = OrcFile.createReader( + new Path(new URI(file.filePath)), OrcFile.readerOptions(conf)) + new SparkOrcNewRecordReader(orcReader, conf, fileSplit.getStart(), fileSplit.getLength()) } // Unwraps `OrcStruct`s to `UnsafeRow`s val unsafeRowIterator = OrcRelation.unwrapOrcStructs( - file.filePath, conf, requiredSchema, new RecordReaderIterator[OrcStruct](orcRecordReader) - ) + conf, + requiredSchema, + Some(orcRecordReader.getObjectInspector.asInstanceOf[StructObjectInspector]), + new RecordReaderIterator[OrcStruct](orcRecordReader)) // Appends partition values val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes @@ -322,10 +325,11 @@ private[orc] case class OrcTableScan( rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iterator) => val writableIterator = iterator.map(_._2) + val maybeStructOI = OrcFileOperator.getObjectInspector(split.getPath.toString, Some(conf)) OrcRelation.unwrapOrcStructs( - split.getPath.toString, wrappedConf.value, StructType.fromAttributes(attributes), + maybeStructOI, writableIterator ) } @@ -355,12 +359,11 @@ private[orc] object OrcRelation extends HiveInspectors { ) def unwrapOrcStructs( - filePath: String, conf: Configuration, dataSchema: StructType, + maybeStructOI: Option[StructObjectInspector], iterator: Iterator[Writable]): Iterator[InternalRow] = { val deserializer = new OrcSerde - val maybeStructOI = OrcFileOperator.getObjectInspector(filePath, Some(conf)) val mutableRow = new SpecificMutableRow(dataSchema.map(_.dataType)) val unsafeProjection = UnsafeProjection.create(dataSchema) -- cgit v1.2.3