aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-03-21 20:16:01 -0700
committerMichael Armbrust <michael@databricks.com>2016-03-21 20:16:01 -0700
commit8014a516d1cbb0f0c7035e2149161aa32fb506f8 (patch)
tree04e97f30e4e4ef519842589f0421ea21a8479186
parent7299961657b5591a3257b21e40f3047db27f221c (diff)
downloadspark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.tar.gz
spark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.tar.bz2
spark-8014a516d1cbb0f0c7035e2149161aa32fb506f8.zip
[SPARK-13883][SQL] Parquet Implementation of FileFormat.buildReader
This PR add implements the new `buildReader` interface for the Parquet `FileFormat`. An simple implementation of `FileScanRDD` is also included. This code should be tested by the many existing tests for parquet. Author: Michael Armbrust <michael@databricks.com> Author: Sameer Agarwal <sameer@databricks.com> Author: Nong Li <nong@databricks.com> Closes #11709 from marmbrus/parquetReader.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java82
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java13
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala44
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala55
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala148
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala1
13 files changed, 366 insertions, 46 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 9db5c4150f..9ac251391b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -29,8 +29,10 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.*;
/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -52,7 +54,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private int numBatched = 0;
/**
- * For each request column, the reader to read this column.
+ * For each request column, the reader to read this column. This is NULL if this column
+ * is missing from the file, in which case we populate the attribute with NULL.
*/
private VectorizedColumnReader[] columnReaders;
@@ -67,6 +70,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private long totalCountLoadedSoFar = 0;
/**
+ * For each column, true if the column is missing in the file and we'll instead return NULLs.
+ */
+ private boolean[] missingColumns;
+
+ /**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row
* by row RecordReader APIs.
@@ -163,14 +171,53 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* This object is reused. Calling this enables the vectorized reader. This should be called
* before any calls to nextKeyValue/nextBatch.
*/
- public ColumnarBatch resultBatch() {
- return resultBatch(DEFAULT_MEMORY_MODE);
- }
- public ColumnarBatch resultBatch(MemoryMode memMode) {
- if (columnarBatch == null) {
- columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+ // Creates a columnar batch that includes the schema from the data files and the additional
+ // partition columns appended to the end of the batch.
+ // For example, if the data contains two columns, with 2 partition columns:
+ // Columns 0,1: data columns
+ // Column 2: partitionValues[0]
+ // Column 3: partitionValues[1]
+ public void initBatch(MemoryMode memMode, StructType partitionColumns,
+ InternalRow partitionValues) {
+ StructType batchSchema = new StructType();
+ for (StructField f: sparkSchema.fields()) {
+ batchSchema = batchSchema.add(f);
+ }
+ if (partitionColumns != null) {
+ for (StructField f : partitionColumns.fields()) {
+ batchSchema = batchSchema.add(f);
+ }
+ }
+
+ columnarBatch = ColumnarBatch.allocate(batchSchema);
+ if (partitionColumns != null) {
+ int partitionIdx = sparkSchema.fields().length;
+ for (int i = 0; i < partitionColumns.fields().length; i++) {
+ ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
+ columnarBatch.column(i + partitionIdx).setIsConstant();
+ }
+ }
+
+ // Initialize missing columns with nulls.
+ for (int i = 0; i < missingColumns.length; i++) {
+ if (missingColumns[i]) {
+ columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
+ columnarBatch.column(i).setIsConstant();
+ }
}
+ }
+
+ public void initBatch() {
+ initBatch(DEFAULT_MEMORY_MODE, null, null);
+ }
+
+ public void initBatch(StructType partitionColumns, InternalRow partitionValues) {
+ initBatch(DEFAULT_MEMORY_MODE, partitionColumns, partitionValues);
+ }
+
+ public ColumnarBatch resultBatch() {
+ if (columnarBatch == null) initBatch();
return columnarBatch;
}
@@ -191,6 +238,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
+ if (columnReaders[i] == null) continue;
columnReaders[i].readBatch(num, columnarBatch.column(i));
}
rowsReturned += num;
@@ -205,6 +253,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
* Check that the requested schema is supported.
*/
OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
+ missingColumns = new boolean[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
@@ -223,9 +272,19 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
throw new IOException("Int96 not supported.");
}
- ColumnDescriptor fd = fileSchema.getColumnDescription(requestedSchema.getPaths().get(i));
- if (!fd.equals(requestedSchema.getColumns().get(i))) {
- throw new IOException("Schema evolution not supported.");
+ String[] colPath = requestedSchema.getPaths().get(i);
+ if (fileSchema.containsPath(colPath)) {
+ ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
+ if (!fd.equals(requestedSchema.getColumns().get(i))) {
+ throw new IOException("Schema evolution not supported.");
+ }
+ missingColumns[i] = false;
+ } else {
+ if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+ // Column is missing in data but the required data is non-nullable. This file is invalid.
+ throw new IOException("Required column is missing in data file. Col: " + colPath);
+ }
+ missingColumns[i] = true;
}
}
}
@@ -240,6 +299,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
List<ColumnDescriptor> columns = requestedSchema.getColumns();
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
+ if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i)));
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 13bf4c5c77..74fa6323cc 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -256,6 +256,8 @@ public abstract class ColumnVector {
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
+ if (isConstant) return;
+
if (childColumns != null) {
for (ColumnVector c: childColumns) {
c.reset();
@@ -823,6 +825,11 @@ public abstract class ColumnVector {
public final boolean isArray() { return resultArray != null; }
/**
+ * Marks this column as being constant.
+ */
+ public final void setIsConstant() { isConstant = true; }
+
+ /**
* Maximum number of rows that can be stored in this column.
*/
protected int capacity;
@@ -844,6 +851,12 @@ public abstract class ColumnVector {
protected boolean anyNullsSet;
/**
+ * True if this column's values are fixed. This means the column values never change, even
+ * across resets.
+ */
+ protected boolean isConstant;
+
+ /**
* Default size of each array length value. This grows as necessary.
*/
protected static final int DEFAULT_ARRAY_LENGTH = 4;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 7ab4cda5a4..792e17911f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String;
/**
* This class is the in memory representation of rows as they are streamed through operators. It
* is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
- * each operator allocates one of thee objects, the storage footprint on the task is negligible.
+ * each operator allocates one of these objects, the storage footprint on the task is negligible.
*
* The layout is a columnar with values encoded in their native format. Each RowBatch contains
* a horizontal partitioning of the data, split into columns.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6116cce17e..e2d5f42f9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -233,7 +233,6 @@ case class DataSource(
"It must be specified manually")
}
-
HadoopFsRelation(
sqlContext,
fileCatalog,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index e2cbbc34d9..bbe7f4abb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
@@ -31,13 +31,17 @@ case class PartitionedFile(
partitionValues: InternalRow,
filePath: String,
start: Long,
- length: Long)
+ length: Long) {
+ override def toString(): String = {
+ s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues"
+ }
+}
+
/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
*
- * IMPLEMENT ME: This is just a placeholder for a future implementation.
* TODO: This currently does not take locality information about the files into account.
*/
case class FilePartition(val index: Int, files: Seq[PartitionedFile]) extends Partition
@@ -48,10 +52,38 @@ class FileScanRDD(
@transient val filePartitions: Seq[FilePartition])
extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
-
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
- throw new NotImplementedError("Not Implemented Yet")
+ val iterator = new Iterator[Object] with AutoCloseable {
+ private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
+ private[this] var currentIterator: Iterator[Object] = null
+
+ def hasNext = (currentIterator != null && currentIterator.hasNext) || nextIterator()
+ def next() = currentIterator.next()
+
+ /** Advances to the next file. Returns true if a new non-empty iterator is available. */
+ private def nextIterator(): Boolean = {
+ if (files.hasNext) {
+ val nextFile = files.next()
+ logInfo(s"Reading File $nextFile")
+ SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
+ currentIterator = readFunction(nextFile)
+ hasNext
+ } else {
+ SqlNewHadoopRDDState.unsetInputFileName()
+ false
+ }
+ }
+
+ override def close() = {
+ SqlNewHadoopRDDState.unsetInputFileName()
+ }
+ }
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addTaskCompletionListener(context => iterator.close())
+
+ iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.
}
- override protected def getPartitions: Array[Partition] = Array.empty
+ override protected def getPartitions: Array[Partition] = filePartitions.toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 62576ea4b2..de89d5f1fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -57,7 +57,9 @@ import org.apache.spark.sql.types._
private[sql] object FileSourceStrategy extends Strategy with Logging {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projects, filters, l@LogicalRelation(files: HadoopFsRelation, _, _))
- if files.fileFormat.toString == "TestFileFormat" =>
+ if (files.fileFormat.toString == "TestFileFormat" ||
+ files.fileFormat.isInstanceOf[parquet.DefaultSource]) &&
+ files.sqlContext.conf.parquetFileScan =>
// Filters on this relation fall into four categories based on where we can use them to avoid
// reading unneeded data:
// - partition keys only - used to prune directories to read
@@ -67,12 +69,15 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters)
val partitionColumns =
- AttributeSet(
- l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
+ l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver)
+ val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
- ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
+ ExpressionSet(filters.filter(_.references.subsetOf(partitionSet)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
+ val dataColumns =
+ l.resolve(files.dataSchema, files.sqlContext.sessionState.analyzer.resolver)
+
val bucketColumns =
AttributeSet(
files.bucketSpec
@@ -82,7 +87,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
.getOrElse(sys.error(""))))
// Partition keys are not available in the statistics of the files.
- val dataFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
+ val dataFilters = filters.filter(_.references.intersect(partitionSet).isEmpty)
// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters
@@ -92,11 +97,13 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterAttributes = AttributeSet(afterScanFilters)
val requiredExpressions: Seq[NamedExpression] = filterAttributes.toSeq ++ projects
- val requiredAttributes = AttributeSet(requiredExpressions).map(_.name).toSet
+ val requiredAttributes = AttributeSet(requiredExpressions)
- val prunedDataSchema =
- StructType(
- files.dataSchema.filter(f => requiredAttributes.contains(f.name)))
+ val readDataColumns =
+ dataColumns
+ .filter(requiredAttributes.contains)
+ .filterNot(partitionColumns.contains)
+ val prunedDataSchema = readDataColumns.toStructType
logInfo(s"Pruned Data Schema: ${prunedDataSchema.simpleString(5)}")
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
@@ -132,7 +139,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val splitFiles = selectedPartitions.flatMap { partition =>
partition.files.flatMap { file =>
- assert(file.getLen != 0)
+ assert(file.getLen != 0, file.toString)
(0L to file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
@@ -180,17 +187,20 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val scan =
DataSourceScan(
- l.output,
+ readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,
readFile,
plannedPartitions),
files,
- Map("format" -> files.fileFormat.toString))
+ Map(
+ "Format" -> files.fileFormat.toString,
+ "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"),
+ "ReadSchema" -> prunedDataSchema.simpleString))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
val withFilter = afterScanFilter.map(execution.Filter(_, scan)).getOrElse(scan)
- val withProjections = if (projects.forall(_.isInstanceOf[AttributeReference])) {
+ val withProjections = if (projects == withFilter.output) {
withFilter
} else {
execution.Project(projects, withFilter)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
new file mode 100644
index 0000000000..f03ae94d55
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/RecordReaderIterator.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.mapreduce.RecordReader
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * An adaptor from a Hadoop [[RecordReader]] to an [[Iterator]] over the values returned.
+ *
+ * Note that this returns [[Object]]s instead of [[InternalRow]] because we rely on erasure to pass
+ * column batches by pretending they are rows.
+ */
+class RecordReaderIterator[T](rowReader: RecordReader[_, T]) extends Iterator[T] {
+ private[this] var havePair = false
+ private[this] var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = !rowReader.nextKeyValue
+ if (finished) {
+ // Close and release the reader here; close() will also be called when the task
+ // completes, but for tasks that read from many files, it helps to release the
+ // resources early.
+ rowReader.close()
+ }
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): T = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ rowReader.getCurrentValue
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 3f0389beed..2f2d438f32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -24,14 +24,16 @@ import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.hadoop.mapreduce.task.JobContextImpl
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
+import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
import org.apache.parquet.{Log => ApacheParquetLog}
+import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.metadata.CompressionCodecName
@@ -45,16 +47,21 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
-import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
+import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.BitSet
-
-private[sql] class DefaultSource extends FileFormat with DataSourceRegister with Logging {
+private[sql] class DefaultSource
+ extends FileFormat
+ with DataSourceRegister
+ with Logging
+ with Serializable {
override def shortName(): String = "parquet"
@@ -269,6 +276,137 @@ private[sql] class DefaultSource extends FileFormat with DataSourceRegister with
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+ /**
+ * Returns a function that can be used to read a single file in as an Iterator of InternalRow.
+ *
+ * @param partitionSchema The schema of the partition column row that will be present in each
+ * PartitionedFile. These columns should be prepended to the rows that
+ * are produced by the iterator.
+ * @param dataSchema The schema of the data that should be output for each row. This may be a
+ * subset of the columns that are present in the file if column pruning has
+ * occurred.
+ * @param filters A set of filters than can optionally be used to reduce the number of rows output
+ * @param options A set of string -> string configuration options.
+ * @return
+ */
+ override def buildReader(
+ sqlContext: SQLContext,
+ partitionSchema: StructType,
+ dataSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
+ val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+ parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
+ parquetConf.set(
+ CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
+ CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+ parquetConf.set(
+ CatalystWriteSupport.SPARK_ROW_SCHEMA,
+ CatalystSchemaConverter.checkFieldNames(dataSchema).json)
+
+ // We want to clear this temporary metadata from saving into Parquet file.
+ // This metadata is only useful for detecting optional columns when pushdowning filters.
+ val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField,
+ dataSchema).asInstanceOf[StructType]
+ CatalystWriteSupport.setSchema(dataSchemaToWrite, parquetConf)
+
+ // Sets flags for `CatalystSchemaConverter`
+ parquetConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sqlContext.conf.getConf(SQLConf.PARQUET_BINARY_AS_STRING))
+ parquetConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+
+ // Try to push down filters when filter push-down is enabled.
+ val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
+ filters
+ // Collects all converted Parquet filter predicates. Notice that not all predicates can be
+ // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
+ // is used here.
+ .flatMap(ParquetFilters.createFilter(dataSchema, _))
+ .reduceOption(FilterApi.and)
+ } else {
+ None
+ }
+
+ val broadcastedConf =
+ sqlContext.sparkContext.broadcast(new SerializableConfiguration(parquetConf))
+
+ // TODO: if you move this into the closure it reverts to the default values.
+ // If true, enable using the custom RecordReader for parquet. This only works for
+ // a subset of the types (no complex types).
+ val enableVectorizedParquetReader: Boolean =
+ sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
+ val enableWholestageCodegen: Boolean =
+ sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
+
+ (file: PartitionedFile) => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val fileSplit =
+ new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
+
+ val split =
+ new org.apache.parquet.hadoop.ParquetInputSplit(
+ fileSplit.getPath,
+ fileSplit.getStart,
+ fileSplit.getStart + fileSplit.getLength,
+ fileSplit.getLength,
+ fileSplit.getLocations,
+ null)
+
+ val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+ val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
+
+ val parquetReader = try {
+ if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
+ val vectorizedReader = new VectorizedParquetRecordReader()
+ vectorizedReader.initialize(split, hadoopAttemptContext)
+ logDebug(s"Appending $partitionSchema ${file.partitionValues}")
+ vectorizedReader.initBatch(partitionSchema, file.partitionValues)
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ // TODO: fix column appending
+ if (enableWholestageCodegen) {
+ logDebug(s"Enabling batch returning")
+ vectorizedReader.enableReturningBatches()
+ }
+ vectorizedReader
+ } catch {
+ case NonFatal(e) =>
+ logDebug(s"Falling back to parquet-mr: $e", e)
+ val reader = pushed match {
+ case Some(filter) =>
+ new ParquetRecordReader[InternalRow](
+ new CatalystReadSupport,
+ FilterCompat.get(filter, null))
+ case _ =>
+ new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+ }
+ reader.initialize(split, hadoopAttemptContext)
+ reader
+ }
+
+ val iter = new RecordReaderIterator(parquetReader)
+
+ // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
+ if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
+ enableVectorizedParquetReader) {
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ val fullSchema = dataSchema.toAttributes ++ partitionSchema.toAttributes
+ val joinedRow = new JoinedRow()
+ val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+
+ // This is a horrible erasure hack... if we type the iterator above, then it actually check
+ // the type in next() and we get a class cast exception. If we make that function return
+ // Object, then we can defer the cast until later!
+ iter.asInstanceOf[Iterator[InternalRow]]
+ .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+ }
+ }
+ }
+
override def buildInternalScan(
sqlContext: SQLContext,
dataSchema: StructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3d1d5b120a..61058eaeab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -288,6 +288,11 @@ object SQLConf {
defaultValue = Some(true),
doc = "Whether the query analyzer should be case sensitive or not.")
+ val PARQUET_FILE_SCAN = booleanConf("spark.sql.parquet.fileScan",
+ defaultValue = Some(true),
+ doc = "Use the new FileScanRDD path for reading parquet data.",
+ isPublic = false)
+
val PARQUET_SCHEMA_MERGING_ENABLED = booleanConf("spark.sql.parquet.mergeSchema",
defaultValue = Some(false),
doc = "When true, the Parquet data source merges schemas collected from all data files, " +
@@ -555,6 +560,8 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin
def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
+ def parquetFileScan: Boolean = getConf(PARQUET_FILE_SCAN)
+
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6101b08702..1e02354edf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -415,7 +415,7 @@ case class HadoopFsRelation(
def refresh(): Unit = location.refresh()
override def toString: String =
- s"$fileFormat part: ${partitionSchema.simpleString}, data: ${dataSchema.simpleString}"
+ s"HadoopFiles"
/** Returns the list of files that will be read when scanning this relation. */
override def inputFiles: Array[String] =
@@ -551,10 +551,13 @@ class HDFSFileCatalog(
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
if (partitionSpec().partitionColumns.isEmpty) {
- Partition(InternalRow.empty, allFiles()) :: Nil
+ Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
- case PartitionDirectory(values, path) => Partition(values, getStatus(path))
+ case PartitionDirectory(values, path) =>
+ Partition(
+ values,
+ getStatus(path).filterNot(_.getPath.getName startsWith "_"))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index ebdb105743..9746187d22 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -445,7 +445,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("SPARK-6352 DirectParquetOutputCommitter") {
+ testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail.
@@ -469,7 +469,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible") {
+ testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatiblity") {
val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 2bce74571d..926fabe611 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.Filter
import org.apache.spark.util.Utils
/**
@@ -204,10 +205,11 @@ private[sql] trait SQLTestUtils
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
- val childRDD = df
- .queryExecution
- .sparkPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
- .child
+ val withoutFilters = df.queryExecution.sparkPlan transform {
+ case Filter(_, child) => child
+ }
+
+ val childRDD = withoutFilters
.execute()
.map(row => Row.fromSeq(row.copy().toSeq(schema)))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 2806b87f33..bc8896d4bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1724,6 +1724,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
withTable("tbl10562") {
val df = Seq(2012 -> "a").toDF("Year", "val")
df.write.partitionBy("Year").saveAsTable("tbl10562")
+ checkAnswer(sql("SELECT year FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT Year FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT yEAr FROM tbl10562"), Row(2012))
checkAnswer(sql("SELECT val FROM tbl10562 WHERE Year > 2015"), Nil)