aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-03-18 14:04:42 -0700
committerDavies Liu <davies.liu@gmail.com>2016-03-18 14:04:42 -0700
commit54794113a6a906b0f9c6bfb9da322e18e007214c (patch)
tree11310ec8f4d8cfc1c03a92797ef1ec407e29667e /sql/core
parent238fb485be4cdf8337cacc58c31a9f885a99853c (diff)
downloadspark-54794113a6a906b0f9c6bfb9da322e18e007214c.tar.gz
spark-54794113a6a906b0f9c6bfb9da322e18e007214c.tar.bz2
spark-54794113a6a906b0f9c6bfb9da322e18e007214c.zip
[SPARK-13989] [SQL] Remove non-vectorized/unsafe-row parquet record reader
## What changes were proposed in this pull request? This PR cleans up the new parquet record reader with the following changes: 1. Removes the non-vectorized parquet reader code from `UnsafeRowParquetRecordReader`. 2. Removes the non-vectorized column reader code from `ColumnReader`. 3. Renames `UnsafeRowParquetRecordReader` to `VectorizedParquetRecordReader` and `ColumnReader` to `VectorizedColumnReader` 4. Deprecate `PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED` ## How was this patch tested? Refactoring only; Existing tests should reveal any problems. Author: Sameer Agarwal <sameer@databricks.com> Closes #11799 from sameeragarwal/vectorized-parquet.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java (renamed from sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java)319
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala61
8 files changed, 75 insertions, 364 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 7234726633..0f00f56a3a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -18,13 +18,11 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.List;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
@@ -37,22 +35,17 @@ import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
-import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
-import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.unsafe.types.UTF8String;
-import static org.apache.parquet.column.ValuesType.*;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
/**
- * A specialized RecordReader that reads into UnsafeRows directly using the Parquet column APIs.
- *
- * This is somewhat based on parquet-mr's ColumnReader.
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * Parquet column APIs. This is somewhat based on parquet-mr's ColumnReader.
*
* TODO: handle complex types, decimal requiring more than 8 bytes, INT96. Schema mismatch.
* All of these can be handled efficiently and easily with codegen.
@@ -61,29 +54,18 @@ import static org.apache.parquet.column.ValuesType.*;
* enabled, this class returns ColumnarBatches which offers significant performance gains.
* TODO: make this always return ColumnarBatches.
*/
-public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
+public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBase<Object> {
/**
- * Batch of unsafe rows that we assemble and the current index we've returned. Every time this
+ * Batch of rows that we assemble and the current index we've returned. Every time this
* batch is used up (batchIdx == numBatched), we populated the batch.
*/
- private UnsafeRow[] rows = new UnsafeRow[64];
private int batchIdx = 0;
private int numBatched = 0;
/**
- * Used to write variable length columns. Same length as `rows`.
- */
- private UnsafeRowWriter[] rowWriters = null;
- /**
- * True if the row contains variable length fields.
- */
- private boolean containsVarLenFields;
-
- /**
* For each request column, the reader to read this column.
- * columnsReaders[i] populated the UnsafeRow's attribute at i.
*/
- private ColumnReader[] columnReaders;
+ private VectorizedColumnReader[] columnReaders;
/**
* The number of rows that have been returned.
@@ -96,17 +78,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
private long totalCountLoadedSoFar = 0;
/**
- * For each column, the annotated original type.
- */
- private OriginalType[] originalTypes;
-
- /**
- * The default size for varlen columns. The row grows as necessary to accommodate the
- * largest column.
- */
- private static final int DEFAULT_VAR_LEN_SIZE = 32;
-
- /**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row
* by row RecordReader APIs.
@@ -176,14 +147,12 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
+ resultBatch();
+
if (returnColumnarBatch) return nextBatch();
if (batchIdx >= numBatched) {
- if (vectorizedDecode()) {
- if (!nextBatch()) return false;
- } else {
- if (!loadBatch()) return false;
- }
+ if (!nextBatch()) return false;
}
++batchIdx;
return true;
@@ -192,12 +161,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
@Override
public Object getCurrentValue() throws IOException, InterruptedException {
if (returnColumnarBatch) return columnarBatch;
-
- if (vectorizedDecode()) {
- return columnarBatch.getRow(batchIdx - 1);
- } else {
- return rows[batchIdx - 1];
- }
+ return columnarBatch.getRow(batchIdx - 1);
}
@Override
@@ -225,7 +189,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
- assert(vectorizedDecode());
returnColumnarBatch = true;
}
@@ -233,12 +196,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
* Advances to the next batch of rows. Returns false if there are no more.
*/
public boolean nextBatch() throws IOException {
- assert(vectorizedDecode());
columnarBatch.reset();
if (rowsReturned >= totalRowCount) return false;
checkEndOfRowGroup();
- int num = (int)Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
+ int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
columnReaders[i].readBatch(num, columnarBatch.column(i));
}
@@ -249,17 +211,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
return true;
}
- /**
- * Returns true if we are doing a vectorized decode.
- */
- private boolean vectorizedDecode() { return columnarBatch != null; }
-
private void initializeInternal() throws IOException {
/**
* Check that the requested schema is supported.
*/
- int numVarLenFields = 0;
- originalTypes = new OriginalType[requestedSchema.getFieldCount()];
+ OriginalType[] originalTypes = new OriginalType[requestedSchema.getFieldCount()];
for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
Type t = requestedSchema.getFields().get(i);
if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
@@ -286,197 +242,13 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
if (!fd.equals(requestedSchema.getColumns().get(i))) {
throw new IOException("Schema evolution not supported.");
}
-
- if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) {
- ++numVarLenFields;
- }
- }
-
- /**
- * Initialize rows and rowWriters. These objects are reused across all rows in the relation.
- */
- containsVarLenFields = numVarLenFields > 0;
- rowWriters = new UnsafeRowWriter[rows.length];
-
- for (int i = 0; i < rows.length; ++i) {
- rows[i] = new UnsafeRow(requestedSchema.getFieldCount());
- BufferHolder holder = new BufferHolder(rows[i], numVarLenFields * DEFAULT_VAR_LEN_SIZE);
- rowWriters[i] = new UnsafeRowWriter(holder, requestedSchema.getFieldCount());
}
}
/**
- * Decodes a batch of values into `rows`. This function is the hot path.
- */
- private boolean loadBatch() throws IOException {
- // no more records left
- if (rowsReturned >= totalRowCount) { return false; }
- checkEndOfRowGroup();
-
- int num = (int)Math.min(rows.length, totalCountLoadedSoFar - rowsReturned);
- rowsReturned += num;
-
- if (containsVarLenFields) {
- for (int i = 0; i < rowWriters.length; ++i) {
- rowWriters[i].holder().reset();
- }
- }
-
- for (int i = 0; i < columnReaders.length; ++i) {
- switch (columnReaders[i].descriptor.getType()) {
- case BOOLEAN:
- decodeBooleanBatch(i, num);
- break;
- case INT32:
- if (originalTypes[i] == OriginalType.DECIMAL) {
- decodeIntAsDecimalBatch(i, num);
- } else {
- decodeIntBatch(i, num);
- }
- break;
- case INT64:
- Preconditions.checkState(originalTypes[i] == null
- || originalTypes[i] == OriginalType.DECIMAL,
- "Unexpected original type: " + originalTypes[i]);
- decodeLongBatch(i, num);
- break;
- case FLOAT:
- decodeFloatBatch(i, num);
- break;
- case DOUBLE:
- decodeDoubleBatch(i, num);
- break;
- case BINARY:
- decodeBinaryBatch(i, num);
- break;
- case FIXED_LEN_BYTE_ARRAY:
- Preconditions.checkState(originalTypes[i] == OriginalType.DECIMAL,
- "Unexpected original type: " + originalTypes[i]);
- decodeFixedLenArrayAsDecimalBatch(i, num);
- break;
- case INT96:
- throw new IOException("Unsupported " + columnReaders[i].descriptor.getType());
- }
- }
-
- numBatched = num;
- batchIdx = 0;
-
- // Update the total row lengths if the schema contained variable length. We did not maintain
- // this as we populated the columns.
- if (containsVarLenFields) {
- for (int i = 0; i < numBatched; ++i) {
- rows[i].setTotalSize(rowWriters[i].holder().totalSize());
- }
- }
-
- return true;
- }
-
- private void decodeBooleanBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- rows[n].setBoolean(col, columnReaders[col].nextBoolean());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeIntBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- rows[n].setInt(col, columnReaders[col].nextInt());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeIntAsDecimalBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- // Since this is stored as an INT, it is always a compact decimal. Just set it as a long.
- rows[n].setLong(col, columnReaders[col].nextInt());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeLongBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- rows[n].setLong(col, columnReaders[col].nextLong());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeFloatBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- rows[n].setFloat(col, columnReaders[col].nextFloat());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeDoubleBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- rows[n].setDouble(col, columnReaders[col].nextDouble());
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeBinaryBatch(int col, int num) throws IOException {
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- ByteBuffer bytes = columnReaders[col].nextBinary().toByteBuffer();
- int len = bytes.remaining();
- if (originalTypes[col] == OriginalType.UTF8) {
- UTF8String str =
- UTF8String.fromBytes(bytes.array(), bytes.arrayOffset() + bytes.position(), len);
- rowWriters[n].write(col, str);
- } else {
- rowWriters[n].write(col, bytes.array(), bytes.arrayOffset() + bytes.position(), len);
- }
- rows[n].setNotNullAt(col);
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- private void decodeFixedLenArrayAsDecimalBatch(int col, int num) throws IOException {
- PrimitiveType type = requestedSchema.getFields().get(col).asPrimitiveType();
- int precision = type.getDecimalMetadata().getPrecision();
- int scale = type.getDecimalMetadata().getScale();
- Preconditions.checkState(precision <= Decimal.MAX_LONG_DIGITS(),
- "Unsupported precision.");
-
- for (int n = 0; n < num; ++n) {
- if (columnReaders[col].next()) {
- Binary v = columnReaders[col].nextBinary();
- // Constructs a `Decimal` with an unscaled `Long` value if possible.
- long unscaled = CatalystRowConverter.binaryToUnscaledLong(v);
- rows[n].setDecimal(col, Decimal.apply(unscaled, precision, scale), precision);
- } else {
- rows[n].setNullAt(col);
- }
- }
- }
-
- /**
- *
* Decoder to return values from a single column.
*/
- private final class ColumnReader {
+ private class VectorizedColumnReader {
/**
* Total number of values read.
*/
@@ -527,7 +299,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
private final PageReader pageReader;
private final ColumnDescriptor descriptor;
- public ColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
+ public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
@@ -634,7 +406,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
int num = Math.min(total, leftInPage);
if (useDictionary) {
// Read and decode dictionary ids.
- ColumnVector dictionaryIds = column.reserveDictionaryIds(total);;
+ ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
decodeDictionaryIds(rowId, num, column, dictionaryIds);
@@ -836,7 +608,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
});
}
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException {
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset) throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
@@ -845,27 +617,18 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
"could not read page in col " + descriptor +
" as the dictionary was missing for encoding " + dataEncoding);
}
- if (vectorizedDecode()) {
- @SuppressWarnings("deprecation")
- Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
- if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
- throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
- }
- this.dataColumn = new VectorizedRleValuesReader();
- } else {
- this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
- descriptor, VALUES, dictionary);
+ @SuppressWarnings("deprecation")
+ Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
+ if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
+ throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
}
+ this.dataColumn = new VectorizedRleValuesReader();
this.useDictionary = true;
} else {
- if (vectorizedDecode()) {
- if (dataEncoding != Encoding.PLAIN) {
- throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
- }
- this.dataColumn = new VectorizedPlainValuesReader();
- } else {
- this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ if (dataEncoding != Encoding.PLAIN) {
+ throw new NotImplementedException("Unsupported encoding: " + dataEncoding);
}
+ this.dataColumn = new VectorizedPlainValuesReader();
this.useDictionary = false;
}
@@ -882,16 +645,12 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
ValuesReader dlReader;
// Initialize the decoders.
- if (vectorizedDecode()) {
- if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
- throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
- }
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
- dlReader = this.defColumn;
- } else {
- dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
+ throw new NotImplementedException("Unsupported encoding: " + page.getDlEncoding());
}
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ dlReader = this.defColumn;
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
@@ -911,16 +670,11 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels(), descriptor);
- if (vectorizedDecode()) {
- int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
- this.defColumn = new VectorizedRleValuesReader(bitWidth);
- this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
- this.defColumn.initFromBuffer(
- this.pageValueCount, page.getDefinitionLevels().toByteArray());
- } else {
- this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(),
- page.getDefinitionLevels(), descriptor);
- }
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
+ this.defColumn.initFromBuffer(
+ this.pageValueCount, page.getDefinitionLevels().toByteArray());
try {
initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
} catch (IOException e) {
@@ -937,9 +691,10 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
+ rowsReturned + " out of " + totalRowCount);
}
List<ColumnDescriptor> columns = requestedSchema.getColumns();
- columnReaders = new ColumnReader[columns.size()];
+ columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
- columnReaders[i] = new ColumnReader(columns.get(i), pages.getPageReader(columns.get(i)));
+ columnReaders[i] = new VectorizedColumnReader(columns.get(i),
+ pages.getPageReader(columns.get(i)));
}
totalCountLoadedSoFar += pages.getRowCount();
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index 2abfd14916..cd769d0137 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -160,6 +160,15 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)
+ case Some((SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED, Some(value))) =>
+ val runFunc = (sqlContext: SQLContext) => {
+ logWarning(
+ s"Property ${SQLConf.Deprecated.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED} is " +
+ s"deprecated and will be ignored. Vectorized parquet reader will be used instead.")
+ Seq(Row(SQLConf.PARQUET_VECTORIZED_READER_ENABLED, "true"))
+ }
+ (keyValueOutput, runFunc)
+
// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index e848f423eb..f3514cd14c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -33,9 +33,9 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
+import org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
@@ -99,8 +99,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
- protected val enableUnsafeRowParquetReader: Boolean =
- sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
protected val enableVectorizedParquetReader: Boolean =
sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
protected val enableWholestageCodegen: Boolean =
@@ -174,19 +172,17 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
* fails (for example, unsupported schema), try with the normal reader.
* TODO: plumb this through a different way?
*/
- if (enableUnsafeRowParquetReader &&
+ if (enableVectorizedParquetReader &&
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
- val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader()
+ val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
if (!parquetReader.tryInitialize(
split.serializableHadoopSplit.value, hadoopAttemptContext)) {
parquetReader.close()
} else {
reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
- if (enableVectorizedParquetReader) {
- parquetReader.resultBatch()
- // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
- if (enableWholestageCodegen) parquetReader.enableReturningBatches();
- }
+ parquetReader.resultBatch()
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ if (enableWholestageCodegen) parquetReader.enableReturningBatches()
}
}
@@ -203,7 +199,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
private[this] var finished = false
override def hasNext: Boolean = {
- if (context.isInterrupted) {
+ if (context.isInterrupted()) {
throw new TaskKilledException
}
if (!finished && !havePair) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c308161413..473cde56fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -345,11 +345,6 @@ object SQLConf {
"option must be set in Hadoop Configuration. 2. This option overrides " +
"\"spark.sql.sources.outputCommitterClass\".")
- val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
- key = "spark.sql.parquet.enableUnsafeRowRecordReader",
- defaultValue = Some(true),
- doc = "Enables using the custom ParquetUnsafeRowRecordReader.")
-
val PARQUET_VECTORIZED_READER_ENABLED = booleanConf(
key = "spark.sql.parquet.enableVectorizedReader",
defaultValue = Some(true),
@@ -527,6 +522,7 @@ object SQLConf {
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+ val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = "spark.sql.parquet.enableUnsafeRowRecordReader"
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
index 29318d8b56..f42c7546c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetEncodingSuite.scala
@@ -36,7 +36,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
List.fill(n)(ROW).toDF.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
@@ -61,7 +61,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
data.repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).toArray.head
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
reader.initialize(file.asInstanceOf[String], null)
val batch = reader.resultBatch()
assert(reader.nextBatch())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index b394ffb366..51183e970d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -57,7 +57,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val output = predicate.collect { case a: Attribute => a }.distinct
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val query = df
.select(output.map(e => Column(e)): _*)
.where(Column(predicate))
@@ -446,7 +446,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11661 Still pushdown filters returned by unhandledFilters") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -520,7 +520,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i.toFloat, i%3)).toDF("a", "b").write.parquet(path)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4d9a8d7eb1..ebdb105743 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -656,7 +656,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
var hash1: Int = 0
var hash2: Int = 0
(false :: true :: Nil).foreach { v =>
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> v.toString) {
val df = sqlContext.read.parquet(dir.getCanonicalPath)
val rows = df.queryExecution.toRdd.map(_.copy()).collect()
val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
@@ -672,13 +672,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
- test("UnsafeRowParquetRecordReader - direct path read") {
- val data = (0 to 10).map(i => (i, ((i + 'a').toChar.toString)))
+ test("VectorizedParquetRecordReader - direct path read") {
+ val data = (0 to 10).map(i => (i, (i + 'a').toChar.toString))
withTempPath { dir =>
sqlContext.createDataFrame(data).repartition(1).write.parquet(dir.getCanonicalPath)
val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0);
{
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, null)
val result = mutable.ArrayBuffer.empty[(Int, String)]
@@ -695,7 +695,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project just one column
{
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, ("_2" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String)]
@@ -711,7 +711,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Project columns in opposite order
{
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, ("_2" :: "_1" :: Nil).asJava)
val result = mutable.ArrayBuffer.empty[(String, Int)]
@@ -728,7 +728,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
// Empty projection
{
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(file, List[String]().asJava)
var result = 0
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index 15bf00e6f4..070c4004c4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -82,38 +82,17 @@ object ParquetReadBenchmark {
}
sqlBenchmark.addCase("SQL Parquet MR") { iter =>
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
- sqlContext.sql("select sum(id) from tempTable").collect()
- }
- }
-
- sqlBenchmark.addCase("SQL Parquet Non-Vectorized") { iter =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(id) from tempTable").collect()
}
}
val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
- // Driving the parquet reader directly without Spark.
- parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num =>
- var sum = 0L
- files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
- reader.initialize(p, ("id" :: Nil).asJava)
-
- while (reader.nextKeyValue()) {
- val record = reader.getCurrentValue.asInstanceOf[InternalRow]
- if (!record.isNullAt(0)) sum += record.getInt(0)
- }
- reader.close()
- }
- }
-
// Driving the parquet reader in batch mode directly.
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -136,7 +115,7 @@ object ParquetReadBenchmark {
parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("id" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -159,7 +138,6 @@ object ParquetReadBenchmark {
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 215 / 262 73.0 13.7 1.0X
SQL Parquet MR 1946 / 2083 8.1 123.7 0.1X
- SQL Parquet Non-Vectorized 1079 / 1213 14.6 68.6 0.2X
*/
sqlBenchmark.run()
@@ -167,9 +145,8 @@ object ParquetReadBenchmark {
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
- ParquetReader Non-Vectorized 610 / 737 25.8 38.8 1.0X
- ParquetReader Vectorized 123 / 152 127.8 7.8 5.0X
- ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 3.7X
+ ParquetReader Vectorized 123 / 152 127.8 7.8 1.0X
+ ParquetReader Vectorized -> Row 165 / 180 95.2 10.5 0.7X
*/
parquetReaderBenchmark.run()
}
@@ -191,32 +168,12 @@ object ParquetReadBenchmark {
}
benchmark.addCase("SQL Parquet MR") { iter =>
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
- sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
- }
- }
-
- benchmark.addCase("SQL Parquet Non-vectorized") { iter =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(c1), sum(length(c2)) from tempTable").collect
}
}
val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
- benchmark.addCase("ParquetReader Non-vectorized") { num =>
- var sum1 = 0L
- var sum2 = 0L
- files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
- reader.initialize(p, null)
- while (reader.nextKeyValue()) {
- val record = reader.getCurrentValue.asInstanceOf[InternalRow]
- if (!record.isNullAt(0)) sum1 += record.getInt(0)
- if (!record.isNullAt(1)) sum2 += record.getUTF8String(1).numBytes()
- }
- reader.close()
- }
- }
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
@@ -224,8 +181,6 @@ object ParquetReadBenchmark {
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 628 / 720 16.7 59.9 1.0X
SQL Parquet MR 1905 / 2239 5.5 181.7 0.3X
- SQL Parquet Non-vectorized 1429 / 1732 7.3 136.3 0.4X
- ParquetReader Non-vectorized 989 / 1357 10.6 94.3 0.6X
*/
benchmark.run()
}
@@ -247,7 +202,7 @@ object ParquetReadBenchmark {
}
benchmark.addCase("SQL Parquet MR") { iter =>
- withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+ withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(length(c1)) from tempTable").collect
}
}
@@ -293,7 +248,7 @@ object ParquetReadBenchmark {
Read data column 191 / 250 82.1 12.2 1.0X
Read partition column 82 / 86 192.4 5.2 2.3X
Read both columns 220 / 248 71.5 14.0 0.9X
- */
+ */
benchmark.run()
}
}
@@ -319,7 +274,7 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized") { num =>
var sum = 0
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()
@@ -340,7 +295,7 @@ object ParquetReadBenchmark {
benchmark.addCase("PR Vectorized (Null Filtering)") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
- val reader = new UnsafeRowParquetRecordReader
+ val reader = new VectorizedParquetRecordReader
try {
reader.initialize(p, ("c1" :: "c2" :: Nil).asJava)
val batch = reader.resultBatch()