aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java34
2 files changed, 36 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 9d50cfab3b..e7f0ec2e77 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readIntsAsLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.ShortType) {
+ defColumn.readShorts(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index b2048c0e39..8613fcae0b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
+ public void readShorts(int total, ColumnVector c,
+ int rowId, int level, VectorizedValuesReader data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ for (int i = 0; i < n; i++) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ }
+ } else {
+ c.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ } else {
+ c.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
public void readLongs(int total, ColumnVector c, int rowId, int level,
VectorizedValuesReader data) {
int left = total;
@@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new ParquetDecodingException("not a valid mode " + this.mode);
}
}
-} \ No newline at end of file
+}