diff options
author | Sameer Agarwal <sameer@databricks.com> | 2016-03-21 18:19:54 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-03-21 18:19:54 -0700 |
commit | 7299961657b5591a3257b21e40f3047db27f221c (patch) | |
tree | 8a7601edb15be76e10a32dcd8801bcbf1c63e097 /sql | |
parent | 43ef1e52bfe359f0f051a607a8dc77cc3b269508 (diff) | |
download | spark-7299961657b5591a3257b21e40f3047db27f221c.tar.gz spark-7299961657b5591a3257b21e40f3047db27f221c.tar.bz2 spark-7299961657b5591a3257b21e40f3047db27f221c.zip |
[SPARK-14016][SQL] Support high-precision decimals in vectorized parquet reader
## What changes were proposed in this pull request?
This patch adds support for reading `DecimalTypes` with high (> 18) precision in `VectorizedColumnReader`
## How was this patch tested?
1. `VectorizedColumnReader` initially had a gating condition on `primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()` that made us fall back on parquet-mr for handling high-precision decimals. This condition is now removed.
2. In particular, the `ParquetHadoopFsRelationSuite` (that tests for all supported hive types -- including `DecimalType(25, 5)`) fails when the gating condition is removed (https://github.com/apache/spark/pull/11808) and should now pass with this change.
Author: Sameer Agarwal <sameer@databricks.com>
Closes #11869 from sameeragarwal/bigdecimal-parquet.
Diffstat (limited to 'sql')
2 files changed, 13 insertions, 4 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 46c84c5dd4..2c23ccc357 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -262,6 +262,11 @@ public class VectorizedColumnReader { Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); column.putLong(i, CatalystRowConverter.binaryToUnscaledLong(v)); } + } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { + for (int i = rowId; i < rowId + num; ++i) { + Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i)); + column.putByteArray(i, v.getBytes()); + } } else { throw new NotImplementedException(); } @@ -368,6 +373,14 @@ public class VectorizedColumnReader { column.putNull(rowId + i); } } + } else if (DecimalType.isByteArrayDecimalType(column.dataType())) { + for (int i = 0; i < num; i++) { + if (defColumn.readInteger() == maxDefLevel) { + column.putByteArray(rowId + i, data.readBinary(arrayLen).getBytes()); + } else { + column.putNull(rowId + i); + } + } } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index ef44b62a8b..9db5c4150f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -220,10 +220,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa originalTypes[i] != OriginalType.INT_8 && originalTypes[i] != OriginalType.INT_16) { throw new IOException("Unsupported type: " + t); } - if (originalTypes[i] == OriginalType.DECIMAL && - primitiveType.getDecimalMetadata().getPrecision() > Decimal.MAX_LONG_DIGITS()) { - throw new IOException("Decimal with high precision is not supported."); - } if (primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { throw new IOException("Int96 not supported."); } |