diff options
author | Sameer Agarwal <sameerag@cs.berkeley.edu> | 2016-07-21 15:34:32 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-21 15:34:32 -0700 |
commit | 46f80a307392bee6743e5847eb5243bf5fcd00a4 (patch) | |
tree | 8df140866e2f368459e8396b94de7f876772acd7 | |
parent | 9abd99b3c318d0ec8b91124d40f3ab9e9d835dcf (diff) | |
download | spark-46f80a307392bee6743e5847eb5243bf5fcd00a4.tar.gz spark-46f80a307392bee6743e5847eb5243bf5fcd00a4.tar.bz2 spark-46f80a307392bee6743e5847eb5243bf5fcd00a4.zip |
[SPARK-16334] Maintain single dictionary per row-batch in vectorized parquet reader
## What changes were proposed in this pull request?
As part of the bugfix in https://github.com/apache/spark/pull/12279, if a row batch consist of both dictionary encoded and non-dictionary encoded pages, we explicitly decode the dictionary for the values that are already dictionary encoded. Currently we reset the dictionary while reading every page that can potentially cause ` java.lang.ArrayIndexOutOfBoundsException` while decoding older pages. This patch fixes the problem by maintaining a single dictionary per row-batch in vectorized parquet reader.
## How was this patch tested?
Manual Tests against a number of hand-generated parquet files.
Author: Sameer Agarwal <sameerag@cs.berkeley.edu>
Closes #14225 from sameeragarwal/vectorized.
-rw-r--r-- | sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index a18b881c78..6c47dc09a8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -59,7 +59,7 @@ public class VectorizedColumnReader { /** * If true, the current page is dictionary encoded. */ - private boolean useDictionary; + private boolean isCurrentPageDictionaryEncoded; /** * Maximum definition level for this column. @@ -100,13 +100,13 @@ public class VectorizedColumnReader { if (dictionaryPage != null) { try { this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage); - this.useDictionary = true; + this.isCurrentPageDictionaryEncoded = true; } catch (IOException e) { throw new IOException("could not decode the dictionary for " + descriptor, e); } } else { this.dictionary = null; - this.useDictionary = false; + this.isCurrentPageDictionaryEncoded = false; } this.totalValueCount = pageReader.getTotalValueCount(); if (totalValueCount == 0) { @@ -136,6 +136,13 @@ public class VectorizedColumnReader { */ void readBatch(int total, ColumnVector column) throws IOException { int rowId = 0; + ColumnVector dictionaryIds = null; + if (dictionary != null) { + // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to + // decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded + // page. + dictionaryIds = column.reserveDictionaryIds(total); + } while (total > 0) { // Compute the number of values we want to read in this page. int leftInPage = (int) (endOfPageValueCount - valuesRead); @@ -144,12 +151,10 @@ public class VectorizedColumnReader { leftInPage = (int) (endOfPageValueCount - valuesRead); } int num = Math.min(total, leftInPage); - if (useDictionary) { + if (isCurrentPageDictionaryEncoded) { // Read and decode dictionary ids. - ColumnVector dictionaryIds = column.reserveDictionaryIds(total); defColumn.readIntegers( num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - if (column.hasDictionary() || (rowId == 0 && (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 || descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 || @@ -461,13 +466,13 @@ public class VectorizedColumnReader { throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); } this.dataColumn = new VectorizedRleValuesReader(); - this.useDictionary = true; + this.isCurrentPageDictionaryEncoded = true; } else { if (dataEncoding != Encoding.PLAIN) { throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding); } this.dataColumn = new VectorizedPlainValuesReader(); - this.useDictionary = false; + this.isCurrentPageDictionaryEncoded = false; } try { |