diff options
author | Michael Armbrust <michael@databricks.com> | 2016-03-21 20:16:01 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-03-21 20:16:01 -0700 |
commit | 8014a516d1cbb0f0c7035e2149161aa32fb506f8 (patch) | |
tree | 04e97f30e4e4ef519842589f0421ea21a8479186 /sql/core/src | |
parent | 7299961657b5591a3257b21e40f3047db27f221c (diff) | |
download | spark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.tar.gz spark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.tar.bz2 spark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.zip |
[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader
This PR add implements the new `buildReader` interface for the Parquet `FileFormat`. An simple implementation of `FileScanRDD` is also included.
This code should be tested by the many existing tests for parquet.
Author: Michael Armbrust <michael@databricks.com>
Author: Sameer Agarwal <sameer@databricks.com>
Author: Nong Li <nong@databricks.com>
Closes #11709 from marmbrus/parquetReader.
Diffstat (limited to 'sql/core/src')
12 files changed, 365 insertions, 46 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9db5c4150f..9ac251391b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -29,8 +29,10 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.*; /** * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the @@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private int numBatched = 0; /** - * For each request column, the reader to read this column. + * For each request column, the reader to read this column. This is NULL if this column + * is missing from the file, in which case we populate the attribute with NULL. */ private VectorizedColumnReader[] columnReaders; @@ -67,6 +70,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private long totalCountLoadedSoFar = 0; /** + * For each column, true if the column is missing in the file and we'll instead return NULLs. + */ + private boolean[] missingColumns; + + /** * columnBatch object that is used for batch decoding. This is created on first use and triggers * batched decoding. It is not valid to interleave calls to the batched interface with the row * by row RecordReader APIs. @@ -163,14 +171,53 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa * This object is reused. Calling this enables the vectorized reader. This should be called * before any calls to nextKeyValue/nextBatch. */ - public ColumnarBatch resultBatch() { - return resultBatch(DEFAULT_MEMORY_MODE); - } - public ColumnarBatch resultBatch(MemoryMode memMode) { - if (columnarBatch == null) { - columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode); + // Creates a columnar batch that includes the schema from the data files and the additional + // partition columns appended to the end of the batch. + // For example, if the data contains two columns, with 2 partition columns: + // Columns 0,1: data columns + // Column 2: partitionValues[0] + // Column 3: partitionValues[1] + public void initBatch(MemoryMode memMode, StructType partitionColumns, + InternalRow partitionValues) { + StructType batchSchema = new StructType(); + for (StructField f: sparkSchema.fields()) { + batchSchema = batchSchema.add(f); + } + if (partitionColumns != null) { + for (StructField f : partitionColumns.fields()) { + batchSchema = batchSchema.add(f); + } + } + + columnarBatch = ColumnarBatch.allocate(batchSchema); + if (partitionColumns != null) { + int partitionIdx = sparkSchema.fields().length; + for (int i = 0; i < partitionColumns.fields().length; i++) { + ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i); + columnarBatch.column(i + partitionIdx).setIsConstant(); + } + } + + // Initialize missing columns with nulls. + for (int i = 0; i < missingColumns.length; i++) { + if (missingColumns[i]) { + columnarBatch.column(i).putNulls(0, columnarBatch.capacity()); + columnarBatch.column(i).setIsConstant(); + } } + } + + public void initBatch() { + initBatch(DEFAULT_MEMORY_MODE, null, null); + } + + public void initBatch(StructType partitionColumns, InternalRow partitionValues) { + initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues); + } + + public ColumnarBatch resultBatch() { + if (columnarBatch == null) initBatch(); return columnarBatch; } @@ -191,6 +238,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned); for (int i = 0; i < columnReaders.length; ++i) { + if (columnReaders[i] == null) continue; columnReaders[i].readBatch(num, columnarBatch.column(i)); } rowsReturned += num; @@ -205,6 +253,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa * Check that the requested schema is supported. */ OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()]; + missingColumns = new boolean[requestedSchema.getFieldCount()]; for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { @@ -223,9 +272,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { throw new IOException("Int96 not supported."); } - ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i)); - if (!fd.equals(requestedSchema.getColumns().get(i))) { - throw new IOException("Schema evolution not supported."); + String[] colPath = requestedSchema.getPaths().get(i); + if (fileSchema.containsPath(colPath)) { + ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); + if (!fd.equals(requestedSchema.getColumns().get(i))) { + throw new IOException("Schema evolution not supported."); + } + missingColumns[i] = false; + } else { + if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) { + // Column is missing in data but the required data is non-nullable. This file is invalid. + throw new IOException("Required column is missing in data file. Col: " + colPath); + } + missingColumns[i] = true; } } } @@ -240,6 +299,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa List<ColumnDescriptor> columns = requestedSchema.getColumns(); columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < columns.size(); ++i) { + if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i))); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 13bf4c5c77..74fa6323cc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -256,6 +256,8 @@ public abstract class ColumnVector { * Resets this column for writing. The currently stored values are no longer accessible. */ public void reset() { + if (isConstant) return; + if (childColumns != null) { for (ColumnVector c: childColumns) { c.reset(); @@ -823,6 +825,11 @@ public abstract class ColumnVector { public final boolean isArray() { return resultArray != null; } /** + * Marks this column as being constant. + */ + public final void setIsConstant() { isConstant = true; } + + /** * Maximum number of rows that can be stored in this column. */ protected int capacity; @@ -844,6 +851,12 @@ public abstract class ColumnVector { protected boolean anyNullsSet; /** + * True if this column's values are fixed. This means the column values never change, even + * across resets. + */ + protected boolean isConstant; + + /** * Default size of each array length value. This grows as necessary. */ protected static final int DEFAULT_ARRAY_LENGTH = 4; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 7ab4cda5a4..792e17911f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String; /** * This class is the in memory representation of rows as they are streamed through operators. It * is designed to maximize CPU efficiency and not storage footprint. Since it is expected that - * each operator allocates one of thee objects, the storage footprint on the task is negligible. + * each operator allocates one of these objects, the storage footprint on the task is negligible. * * The layout is a columnar with values encoded in their native format. Each RowBatch contains * a horizontal partitioning of the data, split into columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 6116cce17e..e2d5f42f9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -233,7 +233,6 @@ case class DataSource( "It must be specified manually") } - HadoopFsRelation( sqlContext, fileCatalog, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index e2cbbc34d9..bbe7f4abb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow @@ -31,13 +31,17 @@ case class PartitionedFile( partitionValues: InternalRow, filePath: String, start: Long, - length: Long) + length: Long) { + override def toString(): String = { + s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" + } +} + /** * A collection of files that should be read as a single task possibly from multiple partitioned * directories. * - * IMPLEMENT ME: This is just a placeholder for a future implementation. * TODO: This currently does not take locality information about the files into account. */ case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition @@ -48,10 +52,38 @@ class FileScanRDD( @transient val filePartitions: Seq[FilePartition]) extends RDD[InternalRow](sqlContext.sparkContext, Nil) { - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - throw new NotImplementedError("Not Implemented Yet") + val iterator = new Iterator[Object] with AutoCloseable { + private[this] val files = split.asInstanceOf[FilePartition].files.toIterator + private[this] var currentIterator: Iterator[Object] = null + + def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator() + def next() = currentIterator.next() + + /** Advances to the next file. Returns true if a new non-empty iterator is available. */ + private def nextIterator(): Boolean = { + if (files.hasNext) { + val nextFile = files.next() + logInfo(s"Reading File $nextFile") + SqlNewHadoopRDDState.setInputFileName(nextFile.filePath) + currentIterator = readFunction(nextFile) + hasNext + } else { + SqlNewHadoopRDDState.unsetInputFileName() + false + } + } + + override def close() = { + SqlNewHadoopRDDState.unsetInputFileName() + } + } + + // Register an on-task-completion callback to close the input stream. + context.addTaskCompletionListener(context => iterator.close()) + + iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack. } - override protected def getPartitions: Array[Partition] = Array.empty + override protected def getPartitions: Array[Partition] = filePartitions.toArray } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 62576ea4b2..de89d5f1fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -57,7 +57,9 @@ import org.apache.spark.sql.types._ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _)) - if files.fileFormat.toString == "TestFileFormat" => + if (files.fileFormat.toString == "TestFileFormat" || + files.fileFormat.isInstanceOf[parquet.DefaultSource]) && + files.sqlContext.conf.parquetFileScan => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val filterSet = ExpressionSet(filters) val partitionColumns = - AttributeSet( - l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)) + l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = - ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns))) + ExpressionSet(filters.filter(_.references.subsetOf(partitionSet))) logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}") + val dataColumns = + l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver) + val bucketColumns = AttributeSet( files.bucketSpec @@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { .getOrElse(sys.error("")))) // Partition keys are not available in the statistics of the files. - val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty) + val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty) // Predicates with both partition keys and attributes need to be evaluated after the scan. val afterScanFilters = filterSet -- partitionKeyFilters @@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val filterAttributes = AttributeSet(afterScanFilters) val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects - val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet + val requiredAttributes = AttributeSet(requiredExpressions) - val prunedDataSchema = - StructType( - files.dataSchema.filter(f => requiredAttributes.contains(f.name))) + val readDataColumns = + dataColumns + .filter(requiredAttributes.contains) + .filterNot(partitionColumns.contains) + val prunedDataSchema = readDataColumns.toStructType logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}") val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) @@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => - assert(file.getLen != 0) + assert(file.getLen != 0, file.toString) (0L to file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining @@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { val scan = DataSourceScan( - l.output, + readDataColumns ++ partitionColumns, new FileScanRDD( files.sqlContext, readFile, plannedPartitions), files, - Map("format" -> files.fileFormat.toString)) + Map( + "Format" -> files.fileFormat.toString, + "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"), + "ReadSchema" -> prunedDataSchema.simpleString)) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan) - val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) { + val withProjections = if (projects == withFilter.output) { withFilter } else { execution.Project(projects, withFilter) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala new file mode 100644 index 0000000000..f03ae94d55 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.mapreduce.RecordReader + +import org.apache.spark.sql.catalyst.InternalRow + +/** + * An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned. + * + * Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass + * column batches by pretending they are rows. + */ +class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] { + private[this] var havePair = false + private[this] var finished = false + + override def hasNext: Boolean = { + if (!finished && !havePair) { + finished = !rowReader.nextKeyValue + if (finished) { + // Close and release the reader here; close() will also be called when the task + // completes, but for tasks that read from many files, it helps to release the + // resources early. + rowReader.close() + } + havePair = !finished + } + !finished + } + + override def next(): T = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + havePair = false + rowReader.getCurrentValue + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 3f0389beed..2f2d438f32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -24,14 +24,16 @@ import java.util.logging.{Logger => JLogger} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} +import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.hadoop.mapreduce.task.JobContextImpl +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} import org.apache.parquet.{Log => ApacheParquetLog} +import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.CompressionCodecName @@ -45,16 +47,21 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser -import org.apache.spark.sql.execution.datasources.{PartitionSpec, _} +import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.BitSet - -private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging { +private[sql] class DefaultSource + extends FileFormat + with DataSourceRegister + with Logging + with Serializable { override def shortName(): String = "parquet" @@ -269,6 +276,137 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister with file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + /** + * Returns a function that can be used to read a single file in as an Iterator of InternalRow. + * + * @param partitionSchema The schema of the partition column row that will be present in each + * PartitionedFile. These columns should be prepended to the rows that + * are produced by the iterator. + * @param dataSchema The schema of the data that should be output for each row. This may be a + * subset of the columns that are present in the file if column pruning has + * occurred. + * @param filters A set of filters than can optionally be used to reduce the number of rows output + * @param options A set of string -> string configuration options. + * @return + */ + override def buildReader( + sqlContext: SQLContext, + partitionSchema: StructType, + dataSchema: StructType, + filters: Seq[Filter], + options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { + val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName) + parquetConf.set( + CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA, + CatalystSchemaConverter.checkFieldNames(dataSchema).json) + parquetConf.set( + CatalystWriteSupport.SPARK_ROW_SCHEMA, + CatalystSchemaConverter.checkFieldNames(dataSchema).json) + + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushdowning filters. + val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] + CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf) + + // Sets flags for `CatalystSchemaConverter` + parquetConf.setBoolean( + SQLConf.PARQUET_BINARY_AS_STRING.key, + sqlContext.conf.getConf(SQLConf.PARQUET_BINARY_AS_STRING)) + parquetConf.setBoolean( + SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, + sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) + + // Try to push down filters when filter push-down is enabled. + val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(ParquetFilters.createFilter(dataSchema, _)) + .reduceOption(FilterApi.and) + } else { + None + } + + val broadcastedConf = + sqlContext.sparkContext.broadcast(new SerializableConfiguration(parquetConf)) + + // TODO: if you move this into the closure it reverts to the default values. + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + val enableVectorizedParquetReader: Boolean = + sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean + val enableWholestageCodegen: Boolean = + sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean + + (file: PartitionedFile) => { + assert(file.partitionValues.numFields == partitionSchema.size) + + val fileSplit = + new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty) + + val split = + new org.apache.parquet.hadoop.ParquetInputSplit( + fileSplit.getPath, + fileSplit.getStart, + fileSplit.getStart + fileSplit.getLength, + fileSplit.getLength, + fileSplit.getLocations, + null) + + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) + + val parquetReader = try { + if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.") + val vectorizedReader = new VectorizedParquetRecordReader() + vectorizedReader.initialize(split, hadoopAttemptContext) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + // TODO: fix column appending + if (enableWholestageCodegen) { + logDebug(s"Enabling batch returning") + vectorizedReader.enableReturningBatches() + } + vectorizedReader + } catch { + case NonFatal(e) => + logDebug(s"Falling back to parquet-mr: $e", e) + val reader = pushed match { + case Some(filter) => + new ParquetRecordReader[InternalRow]( + new CatalystReadSupport, + FilterCompat.get(filter, null)) + case _ => + new ParquetRecordReader[InternalRow](new CatalystReadSupport) + } + reader.initialize(split, hadoopAttemptContext) + reader + } + + val iter = new RecordReaderIterator(parquetReader) + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && + enableVectorizedParquetReader) { + iter.asInstanceOf[Iterator[InternalRow]] + } else { + val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes + val joinedRow = new JoinedRow() + val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + // This is a horrible erasure hack... if we type the iterator above, then it actually check + // the type in next() and we get a class cast exception. If we make that function return + // Object, then we can defer the cast until later! + iter.asInstanceOf[Iterator[InternalRow]] + .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) + } + } + } + override def buildInternalScan( sqlContext: SQLContext, dataSchema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3d1d5b120a..61058eaeab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -288,6 +288,11 @@ object SQLConf { defaultValue = Some(true), doc = "Whether the query analyzer should be case sensitive or not.") + val PARQUET_FILE_SCAN = booleanConf("spark.sql.parquet.fileScan", + defaultValue = Some(true), + doc = "Use the new FileScanRDD path for reading parquet data.", + isPublic = false) + val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema", defaultValue = Some(false), doc = "When true, the Parquet data source merges schemas collected from all data files, " + @@ -555,6 +560,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) + def parquetFileScan: Boolean = getConf(PARQUET_FILE_SCAN) + def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 6101b08702..1e02354edf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -415,7 +415,7 @@ case class HadoopFsRelation( def refresh(): Unit = location.refresh() override def toString: String = - s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}" + s"HadoopFiles" /** Returns the list of files that will be read when scanning this relation. */ override def inputFiles: Array[String] = @@ -551,10 +551,13 @@ class HDFSFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = { if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles()) :: Nil + Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil } else { prunePartitions(filters, partitionSpec()).map { - case PartitionDirectory(values, path) => Partition(values, getStatus(path)) + case PartitionDirectory(values, path) => + Partition( + values, + getStatus(path).filterNot(_.getPath.getName startsWith "_")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index ebdb105743..9746187d22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -445,7 +445,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("SPARK-6352 DirectParquetOutputCommitter") { + testQuietly("SPARK-6352 DirectParquetOutputCommitter") { val clonedConf = new Configuration(hadoopConfiguration) // Write to a parquet file and let it fail. @@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } - test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") { + testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatiblity") { val clonedConf = new Configuration(hadoopConfiguration) // Write to a parquet file and let it fail. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 2bce74571d..926fabe611 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.Filter import org.apache.spark.util.Utils /** @@ -204,10 +205,11 @@ private[sql] trait SQLTestUtils */ protected def stripSparkFilter(df: DataFrame): DataFrame = { val schema = df.schema - val childRDD = df - .queryExecution - .sparkPlan.asInstanceOf[org.apache.spark.sql.execution.Filter] - .child + val withoutFilters = df.queryExecution.sparkPlan transform { + case Filter(_, child) => child + } + + val childRDD = withoutFilters .execute() .map(row => Row.fromSeq(row.copy().toSeq(schema))) |