diff options
author | Liang-Chi Hsieh <viirya@gmail.com> | 2016-02-27 11:41:35 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-27 11:41:35 -0800 |
commit | 3814d0bcf6f1697a94123be4b224cbd7554025a9 (patch) | |
tree | e00539be5455cf7ba1db60d215c7d92c5d0e9321 | |
parent | ec0cc75e158335a4110c86cf63c19d7d45167834 (diff) | |
download | spark-3814d0bcf6f1697a94123be4b224cbd7554025a9.tar.gz spark-3814d0bcf6f1697a94123be4b224cbd7554025a9.tar.bz2 spark-3814d0bcf6f1697a94123be4b224cbd7554025a9.zip |
[SPARK-13530][SQL] Add ShortType support to UnsafeRowParquetRecordReader
JIRA: https://issues.apache.org/jira/browse/SPARK-13530
## What changes were proposed in this pull request?
By enabling vectorized parquet scanner by default, the unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be failed due to the lack of short type support in `UnsafeRowParquetRecordReader`. We should fix it.
The error exception:
[info] ParquetHadoopFsRelationSuite:
[info] - test all data types - StringType (499 milliseconds)
[info] - test all data types - BinaryType (447 milliseconds)
[info] - test all data types - BooleanType (520 milliseconds)
[info] - test all data types - ByteType (418 milliseconds)
00:22:58.920 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 124.0 (TID 1949)
org.apache.commons.lang.NotImplementedException: Unimplemented type: ShortType
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readIntBatch(UnsafeRowParquetRecordReader.java:769)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readBatch(UnsafeRowParquetRecordReader.java:640)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.access$000(UnsafeRowParquetRecordReader.java:461)
at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextBatch(UnsafeRowParquetRecordReader.java:224)
## How was this patch tested?
The unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52110/consoleFull) due to the lack of short type support in UnsafeRowParquetRecordReader. By adding this support, the test can be passed.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes #11412 from viirya/add-shorttype-support.
2 files changed, 36 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 9d50cfab3b..e7f0ec2e77 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas } else if (DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readIntsAsLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ShortType) { + defColumn.readShorts( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index b2048c0e39..8613fcae0b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader } } + public void readShorts(int total, ColumnVector c, + int rowId, int level, VectorizedValuesReader data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + for (int i = 0; i < n; i++) { + c.putShort(rowId + i, (short)data.readInteger()); + } + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putShort(rowId + i, (short)data.readInteger()); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readLongs(int total, ColumnVector c, int rowId, int level, VectorizedValuesReader data) { int left = total; @@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader throw new ParquetDecodingException("not a valid mode " + this.mode); } } -}
\ No newline at end of file +} |