aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSameer Agarwal <sameerag@cs.berkeley.edu>2016-07-21 15:34:32 -0700
committerReynold Xin <rxin@databricks.com>2016-07-21 15:34:32 -0700
commit46f80a307392bee6743e5847eb5243bf5fcd00a4 (patch)
tree8df140866e2f368459e8396b94de7f876772acd7
parent9abd99b3c318d0ec8b91124d40f3ab9e9d835dcf (diff)
downloadspark-46f80a307392bee6743e5847eb5243bf5fcd00a4.tar.gz
spark-46f80a307392bee6743e5847eb5243bf5fcd00a4.tar.bz2
spark-46f80a307392bee6743e5847eb5243bf5fcd00a4.zip
[SPARK-16334] Maintain single dictionary per row-batch in vectorized parquet reader
## What changes were proposed in this pull request? As part of the bugfix in https://github.com/apache/spark/pull/12279, if a row batch consist of both dictionary encoded and non-dictionary encoded pages, we explicitly decode the dictionary for the values that are already dictionary encoded. Currently we reset the dictionary while reading every page that can potentially cause ` java.lang.ArrayIndexOutOfBoundsException` while decoding older pages. This patch fixes the problem by maintaining a single dictionary per row-batch in vectorized parquet reader. ## How was this patch tested? Manual Tests against a number of hand-generated parquet files. Author: Sameer Agarwal <sameerag@cs.berkeley.edu> Closes #14225 from sameeragarwal/vectorized.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java21
1 files changed, 13 insertions, 8 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index a18b881c78..6c47dc09a8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -59,7 +59,7 @@ public class VectorizedColumnReader {
/**
* If true, the current page is dictionary encoded.
*/
- private boolean useDictionary;
+ private boolean isCurrentPageDictionaryEncoded;
/**
* Maximum definition level for this column.
@@ -100,13 +100,13 @@ public class VectorizedColumnReader {
if (dictionaryPage != null) {
try {
this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
- this.useDictionary = true;
+ this.isCurrentPageDictionaryEncoded = true;
} catch (IOException e) {
throw new IOException("could not decode the dictionary for " + descriptor, e);
}
} else {
this.dictionary = null;
- this.useDictionary = false;
+ this.isCurrentPageDictionaryEncoded = false;
}
this.totalValueCount = pageReader.getTotalValueCount();
if (totalValueCount == 0) {
@@ -136,6 +136,13 @@ public class VectorizedColumnReader {
*/
void readBatch(int total, ColumnVector column) throws IOException {
int rowId = 0;
+ ColumnVector dictionaryIds = null;
+ if (dictionary != null) {
+ // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to
+ // decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded
+ // page.
+ dictionaryIds = column.reserveDictionaryIds(total);
+ }
while (total > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
@@ -144,12 +151,10 @@ public class VectorizedColumnReader {
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
int num = Math.min(total, leftInPage);
- if (useDictionary) {
+ if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
- ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
-
if (column.hasDictionary() || (rowId == 0 &&
(descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
@@ -461,13 +466,13 @@ public class VectorizedColumnReader {
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedRleValuesReader();
- this.useDictionary = true;
+ this.isCurrentPageDictionaryEncoded = true;
} else {
if (dataEncoding != Encoding.PLAIN) {
throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
}
this.dataColumn = new VectorizedPlainValuesReader();
- this.useDictionary = false;
+ this.isCurrentPageDictionaryEncoded = false;
}
try {