aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorSameer Agarwal <sameerag@cs.berkeley.edu>2016-09-02 15:16:16 -0700
committerDavies Liu <davies.liu@gmail.com>2016-09-02 15:16:16 -0700
commita2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a (patch)
tree2f7b50a97fd5aa7143576b679745b796687af30c /sql
parented9c884dcf925500ceb388b06b33bd2c95cd2ada (diff)
downloadspark-a2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a.tar.gz
spark-a2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a.tar.bz2
spark-a2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a.zip
[SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't throw an error
## What changes were proposed in this pull request? This patch fixes a bug in the vectorized parquet reader that's caused by re-using the same dictionary column vector while reading consecutive row groups. Specifically, this issue manifests for a certain distribution of dictionary/plain encoded data while we read/populate the underlying bit packed dictionary data into a column-vector based data structure. ## How was this patch tested? Manually tested on datasets provided by the community. Thanks to Chris Perluss and Keith Kraus for their invaluable help in tracking down this issue! Author: Sameer Agarwal <sameerag@cs.berkeley.edu> Closes #14941 from sameeragarwal/parquet-exception-2.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java54
1 files changed, 38 insertions, 16 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 4ed59b08a4..cb51cb499e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -221,15 +221,21 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.IntegerType ||
DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ }
}
} else if (column.dataType() == DataTypes.ByteType) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ }
}
} else if (column.dataType() == DataTypes.ShortType) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
+ }
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -240,7 +246,9 @@ public class VectorizedColumnReader {
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
+ }
}
} else {
throw new UnsupportedOperationException("Unimplemented type: " + column.dataType());
@@ -249,21 +257,27 @@ public class VectorizedColumnReader {
case FLOAT:
for (int i = rowId; i < rowId + num; ++i) {
- column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
+ }
}
break;
case DOUBLE:
for (int i = rowId; i < rowId + num; ++i) {
- column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
+ if (!column.isNullAt(i)) {
+ column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
+ }
}
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
for (int i = rowId; i < rowId + num; ++i) {
// TODO: Convert dictionary of Binaries to dictionary of Longs
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+ if (!column.isNullAt(i)) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+ column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
+ }
}
} else {
throw new UnsupportedOperationException();
@@ -275,26 +289,34 @@ public class VectorizedColumnReader {
// and reuse it across batches. This should mean adding a ByteArray would just update
// the length and offset.
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putByteArray(i, v.getBytes());
+ if (!column.isNullAt(i)) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+ column.putByteArray(i, v.getBytes());
+ }
}
break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
+ if (!column.isNullAt(i)) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+ column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
+ }
}
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
+ if (!column.isNullAt(i)) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+ column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
+ }
}
} else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
for (int i = rowId; i < rowId + num; ++i) {
- Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
- column.putByteArray(i, v.getBytes());
+ if (!column.isNullAt(i)) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
+ column.putByteArray(i, v.getBytes());
+ }
}
} else {
throw new UnsupportedOperationException();