aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-02-27 11:41:35 -0800
committerReynold Xin <rxin@databricks.com>2016-02-27 11:41:35 -0800
commit3814d0bcf6f1697a94123be4b224cbd7554025a9 (patch)
treee00539be5455cf7ba1db60d215c7d92c5d0e9321 /sql/core/src
parentec0cc75e158335a4110c86cf63c19d7d45167834 (diff)
downloadspark-3814d0bcf6f1697a94123be4b224cbd7554025a9.tar.gz
spark-3814d0bcf6f1697a94123be4b224cbd7554025a9.tar.bz2
spark-3814d0bcf6f1697a94123be4b224cbd7554025a9.zip
[SPARK-13530][SQL] Add ShortType support to UnsafeRowParquetRecordReader
JIRA: https://issues.apache.org/jira/browse/SPARK-13530 ## What changes were proposed in this pull request? By enabling vectorized parquet scanner by default, the unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be failed due to the lack of short type support in `UnsafeRowParquetRecordReader`. We should fix it. The error exception: [info] ParquetHadoopFsRelationSuite: [info] - test all data types - StringType (499 milliseconds) [info] - test all data types - BinaryType (447 milliseconds) [info] - test all data types - BooleanType (520 milliseconds) [info] - test all data types - ByteType (418 milliseconds) 00:22:58.920 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 124.0 (TID 1949) org.apache.commons.lang.NotImplementedException: Unimplemented type: ShortType at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readIntBatch(UnsafeRowParquetRecordReader.java:769) at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.readBatch(UnsafeRowParquetRecordReader.java:640) at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader$ColumnReader.access$000(UnsafeRowParquetRecordReader.java:461) at org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.nextBatch(UnsafeRowParquetRecordReader.java:224) ## How was this patch tested? The unit test `ParquetHadoopFsRelationSuite` based on `HadoopFsRelationTest` will be [failed](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/52110/consoleFull) due to the lack of short type support in UnsafeRowParquetRecordReader. By adding this support, the test can be passed. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #11412 from viirya/add-shorttype-support.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java3
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java34
2 files changed, 36 insertions, 1 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 9d50cfab3b..e7f0ec2e77 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -765,6 +765,9 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
} else if (DecimalType.is64BitDecimalType(column.dataType())) {
defColumn.readIntsAsLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
+ } else if (column.dataType() == DataTypes.ShortType) {
+ defColumn.readShorts(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
throw new NotImplementedException("Unimplemented type: " + column.dataType());
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index b2048c0e39..8613fcae0b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -301,6 +301,38 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
+ public void readShorts(int total, ColumnVector c,
+ int rowId, int level, VectorizedValuesReader data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ for (int i = 0; i < n; i++) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ }
+ } else {
+ c.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ c.putShort(rowId + i, (short)data.readInteger());
+ } else {
+ c.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
public void readLongs(int total, ColumnVector c, int rowId, int level,
VectorizedValuesReader data) {
int left = total;
@@ -611,4 +643,4 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new ParquetDecodingException("not a valid mode " + this.mode);
}
}
-} \ No newline at end of file
+}