diff options
Diffstat (limited to 'sql/core')
2 files changed, 36 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 9d50cfab3b..e7f0ec2e77 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas } else if (DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readIntsAsLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ShortType) { + defColumn.readShorts( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index b2048c0e39..8613fcae0b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader } } + public void readShorts(int total, ColumnVector c, + int rowId, int level, VectorizedValuesReader data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + for (int i = 0; i < n; i++) { + c.putShort(rowId + i, (short)data.readInteger()); + } + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putShort(rowId + i, (short)data.readInteger()); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readLongs(int total, ColumnVector c, int rowId, int level, VectorizedValuesReader data) { int left = total; @@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader throw new ParquetDecodingException("not a valid mode " + this.mode); } } -}
\ No newline at end of file +} |