aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java120
1 files changed, 66 insertions, 54 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6cc2fda587..ea37a08ab5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -27,6 +27,7 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;
@@ -115,57 +116,6 @@ public class VectorizedColumnReader {
}
/**
- * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned.
- */
- public boolean nextBoolean() {
- if (!useDictionary) {
- return dataColumn.readBoolean();
- } else {
- return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
- }
- }
-
- public int nextInt() {
- if (!useDictionary) {
- return dataColumn.readInteger();
- } else {
- return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
- }
- }
-
- public long nextLong() {
- if (!useDictionary) {
- return dataColumn.readLong();
- } else {
- return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
- }
- }
-
- public float nextFloat() {
- if (!useDictionary) {
- return dataColumn.readFloat();
- } else {
- return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
- }
- }
-
- public double nextDouble() {
- if (!useDictionary) {
- return dataColumn.readDouble();
- } else {
- return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
- }
- }
-
- public Binary nextBinary() {
- if (!useDictionary) {
- return dataColumn.readBytes();
- } else {
- return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
- }
- }
-
- /**
* Advances to the next value. Returns true if the value is non-null.
*/
private boolean next() throws IOException {
@@ -200,8 +150,26 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
+
+ if (column.hasDictionary() || (rowId == 0 &&
+ (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
+ // Column vector supports lazy decoding of dictionary values so just set the dictionary.
+ // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
+ // non-dictionary encoded values have already been added).
+ column.setDictionary(dictionary);
+ } else {
+ decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ }
} else {
+ if (column.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values but this new page is not. The batch
+ // does not support a mix of dictionary and not so we will decode the dictionary.
+ decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
+ }
column.setDictionary(null);
switch (descriptor.getType()) {
case BOOLEAN:
@@ -246,11 +214,45 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds) {
switch (descriptor.getType()) {
case INT32:
+ if (column.dataType() == DataTypes.IntegerType ||
+ DecimalType.is32BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else if (column.dataType() == DataTypes.ByteType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else if (column.dataType() == DataTypes.ShortType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ break;
+
case INT64:
+ if (column.dataType() == DataTypes.LongType ||
+ DecimalType.is64BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
+ }
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ break;
+
case FLOAT:
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
+ }
+ break;
+
case DOUBLE:
- case BINARY:
- column.setDictionary(dictionary);
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
+ }
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
@@ -263,6 +265,16 @@ public class VectorizedColumnReader {
throw new NotImplementedException();
}
break;
+ case BINARY:
+ // TODO: this is incredibly inefficient as it blows up the dictionary right here. We
+ // need to do this better. We should probably add the dictionary data to the ColumnVector
+ // and reuse it across batches. This should mean adding a ByteArray would just update
+ // the length and offset.
+ for (int i = rowId; i < rowId + num; ++i) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putByteArray(i, v.getBytes());
+ }
+ break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {