aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2016-04-09 17:45:10 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-09 17:45:10 -0700
commit5989c85b535f7f623392d6456d8b37052487f24b (patch)
tree48fa983954f6e631b06954ec4177c45c7fcd84a6
parent5cb5edaf9c5054e42d41f20b2dd92dafcccbf0d6 (diff)
downloadspark-5989c85b535f7f623392d6456d8b37052487f24b.tar.gz
spark-5989c85b535f7f623392d6456d8b37052487f24b.tar.bz2
spark-5989c85b535f7f623392d6456d8b37052487f24b.zip
[SPARK-14217] [SQL] Fix bug if parquet data has columns that use dictionary encoding for some of the data
## What changes were proposed in this pull request? This PR is based on #12017 Currently, this causes batches where some values are dictionary encoded and some which are not. The non-dictionary encoded values cause us to remove the dictionary from the batch causing the first values to return garbage. This patch fixes the issue by first decoding the dictionary for the values that are already dictionary encoded before switching. A similar thing is done for the reverse case where the initial values are not dictionary encoded. ## How was this patch tested? This is difficult to test but replicated on a test cluster using a large tpcds data set. Author: Nong Li <nong@databricks.com> Author: Davies Liu <davies@databricks.com> Closes #12279 from davies/fix_dict.
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java120
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java12
2 files changed, 78 insertions, 54 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index 6cc2fda587..ea37a08ab5 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -27,6 +27,7 @@ import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.types.DataTypes;
@@ -115,57 +116,6 @@ public class VectorizedColumnReader {
}
/**
- * TODO: Hoist the useDictionary branch to decode*Batch and make the batch page aligned.
- */
- public boolean nextBoolean() {
- if (!useDictionary) {
- return dataColumn.readBoolean();
- } else {
- return dictionary.decodeToBoolean(dataColumn.readValueDictionaryId());
- }
- }
-
- public int nextInt() {
- if (!useDictionary) {
- return dataColumn.readInteger();
- } else {
- return dictionary.decodeToInt(dataColumn.readValueDictionaryId());
- }
- }
-
- public long nextLong() {
- if (!useDictionary) {
- return dataColumn.readLong();
- } else {
- return dictionary.decodeToLong(dataColumn.readValueDictionaryId());
- }
- }
-
- public float nextFloat() {
- if (!useDictionary) {
- return dataColumn.readFloat();
- } else {
- return dictionary.decodeToFloat(dataColumn.readValueDictionaryId());
- }
- }
-
- public double nextDouble() {
- if (!useDictionary) {
- return dataColumn.readDouble();
- } else {
- return dictionary.decodeToDouble(dataColumn.readValueDictionaryId());
- }
- }
-
- public Binary nextBinary() {
- if (!useDictionary) {
- return dataColumn.readBytes();
- } else {
- return dictionary.decodeToBinary(dataColumn.readValueDictionaryId());
- }
- }
-
- /**
* Advances to the next value. Returns true if the value is non-null.
*/
private boolean next() throws IOException {
@@ -200,8 +150,26 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
defColumn.readIntegers(
num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
- decodeDictionaryIds(rowId, num, column, dictionaryIds);
+
+ if (column.hasDictionary() || (rowId == 0 &&
+ (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE ||
+ descriptor.getType() == PrimitiveType.PrimitiveTypeName.BINARY))) {
+ // Column vector supports lazy decoding of dictionary values so just set the dictionary.
+ // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
+ // non-dictionary encoded values have already been added).
+ column.setDictionary(dictionary);
+ } else {
+ decodeDictionaryIds(rowId, num, column, dictionaryIds);
+ }
} else {
+ if (column.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values but this new page is not. The batch
+ // does not support a mix of dictionary and not so we will decode the dictionary.
+ decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
+ }
column.setDictionary(null);
switch (descriptor.getType()) {
case BOOLEAN:
@@ -246,11 +214,45 @@ public class VectorizedColumnReader {
ColumnVector dictionaryIds) {
switch (descriptor.getType()) {
case INT32:
+ if (column.dataType() == DataTypes.IntegerType ||
+ DecimalType.is32BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putInt(i, dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else if (column.dataType() == DataTypes.ByteType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else if (column.dataType() == DataTypes.ShortType) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getInt(i)));
+ }
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ break;
+
case INT64:
+ if (column.dataType() == DataTypes.LongType ||
+ DecimalType.is64BitDecimalType(column.dataType())) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putLong(i, dictionary.decodeToLong(dictionaryIds.getInt(i)));
+ }
+ } else {
+ throw new NotImplementedException("Unimplemented type: " + column.dataType());
+ }
+ break;
+
case FLOAT:
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getInt(i)));
+ }
+ break;
+
case DOUBLE:
- case BINARY:
- column.setDictionary(dictionary);
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getInt(i)));
+ }
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
@@ -263,6 +265,16 @@ public class VectorizedColumnReader {
throw new NotImplementedException();
}
break;
+ case BINARY:
+ // TODO: this is incredibly inefficient as it blows up the dictionary right here. We
+ // need to do this better. We should probably add the dictionary data to the ColumnVector
+ // and reuse it across batches. This should mean adding a ByteArray would just update
+ // the length and offset.
+ for (int i = rowId; i < rowId + num; ++i) {
+ Binary v = dictionary.decodeToBinary(dictionaryIds.getInt(i));
+ column.putByteArray(i, v.getBytes());
+ }
+ break;
case FIXED_LEN_BYTE_ARRAY:
// DecimalType written in the legacy mode
if (DecimalType.is32BitDecimalType(column.dataType())) {
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 0b276e6c77..ff1f6680a7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -913,6 +913,11 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
+ * Returns true if this column has a dictionary.
+ */
+ public boolean hasDictionary() { return this.dictionary != null; }
+
+ /**
* Reserve a integer column for ids of dictionary.
*/
public ColumnVector reserveDictionaryIds(int capacity) {
@@ -927,6 +932,13 @@ public abstract class ColumnVector implements AutoCloseable {
}
/**
+ * Returns the underlying integer column for ids of dictionary.
+ */
+ public ColumnVector getDictionaryIds() {
+ return dictionaryIds;
+ }
+
+ /**
* Sets up the common state and also handles creating the child columns if this is a nested
* type.
*/