From a2c9acb0e54b2e38cb8ee6431f1ea0e0b4cd959a Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Fri, 2 Sep 2016 15:16:16 -0700 Subject: [SPARK-16334] Reusing same dictionary column for decoding consecutive row groups shouldn't throw an error ## What changes were proposed in this pull request? This patch fixes a bug in the vectorized parquet reader that's caused by re-using the same dictionary column vector while reading consecutive row groups. Specifically, this issue manifests for a certain distribution of dictionary/plain encoded data while we read/populate the underlying bit packed dictionary data into a column-vector based data structure. ## How was this patch tested? Manually tested on datasets provided by the community. Thanks to Chris Perluss and Keith Kraus for their invaluable help in tracking down this issue! Author: Sameer Agarwal Closes #14941 from sameeragarwal/parquet-exception-2. --- .../parquet/VectorizedColumnReader.java | 54 +++++++++++++++------- 1 file changed, 38 insertions(+), 16 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 4ed59b08a4..cb51cb499e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -221,15 +221,21 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.IntegerType || DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else if (column.dataType() == DataTypes.ByteType) { for (int i = rowId; i < rowId + num; ++i) { - column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else if (column.dataType() == DataTypes.ShortType) { for (int i = rowId; i < rowId + num; ++i) { - column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i))); + } } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -240,7 +246,9 @@ public class VectorizedColumnReader { if (column.dataType() == DataTypes.LongType || DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); + } } } else { throw new UnsupportedOperationException("Unimplemented type: " + column.dataType()); @@ -249,21 +257,27 @@ public class VectorizedColumnReader { case FLOAT: for (int i = rowId; i < rowId + num; ++i) { - column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i))); + } } break; case DOUBLE: for (int i = rowId; i < rowId + num; ++i) { - column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); + if (!column.isNullAt(i)) { + column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i))); + } } break; case INT96: if (column.dataType() == DataTypes.TimestampType) { for (int i = rowId; i < rowId + num; ++i) { // TODO: Convert dictionary of Binaries to dictionary of Longs - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + } } } else { throw new UnsupportedOperationException(); @@ -275,26 +289,34 @@ public class VectorizedColumnReader { // and reuse it across batches. This should mean adding a ByteArray would just update // the length and offset. for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putByteArray(i, v.getBytes()); + } } break; case FIXED_LEN_BYTE_ARRAY: // DecimalType written in the legacy mode if (DecimalType.is32BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v)); + } } } else if (DecimalType.is64BitDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v)); + } } } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { for (int i = rowId; i < rowId + num; ++i) { - Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putByteArray(i, v.getBytes()); + if (!column.isNullAt(i)) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); + column.putByteArray(i, v.getBytes()); + } } } else { throw new UnsupportedOperationException(); -- cgit v1.2.3