aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java17
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java66
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala30
3 files changed, 59 insertions, 54 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 4576ac2a32..9d50cfab3b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -628,7 +628,8 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
dictionaryIds.reserve(total);
}
// Read and decode dictionary ids.
- readIntBatch(rowId, num, dictionaryIds);
+ defColumn.readIntegers(
+ num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
decodeDictionaryIds(rowId, num, column);
} else {
switch (descriptor.getType()) {
@@ -739,18 +740,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
}
-
- if (dictionaryIds.numNulls() > 0) {
- // Copy the NULLs over.
- // TODO: we can improve this by decoding the NULLs directly into column. This would
- // mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then
- // just do the ID remapping as above.
- for (int i = 0; i < num; ++i) {
- if (dictionaryIds.getIsNull(rowId + i)) {
- column.putNull(rowId + i);
- }
- }
- }
}
/**
@@ -769,7 +758,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) {
defColumn.readIntegers(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0);
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 629959a73b..b2048c0e39 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
@@ -176,11 +175,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
* if (this.readInt() == level) {
* c[rowId] = data.readInteger();
* } else {
- * c[rowId] = nullValue;
+ * c[rowId] = null;
* }
*/
public void readIntegers(int total, ColumnVector c, int rowId, int level,
- VectorizedValuesReader data, int nullValue) {
+ VectorizedValuesReader data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -189,7 +188,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readIntegers(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -198,9 +196,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putInt(rowId + i, data.readInteger());
- c.putNotNull(rowId + i);
} else {
- c.putInt(rowId + i, nullValue);
c.putNull(rowId + i);
}
}
@@ -223,7 +219,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBooleans(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -232,7 +227,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putBoolean(rowId + i, data.readBoolean());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -257,7 +251,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; i++) {
c.putLong(rowId + i, data.readInteger());
}
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -266,7 +259,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readInteger());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -289,7 +281,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBytes(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -298,7 +289,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putByte(rowId + i, data.readByte());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -321,7 +311,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readLongs(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -330,7 +319,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readLong());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -353,7 +341,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readFloats(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -362,7 +349,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putFloat(rowId + i, data.readFloat());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -385,7 +371,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readDoubles(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -394,7 +379,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putDouble(rowId + i, data.readDouble());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -416,7 +400,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
switch (mode) {
case RLE:
if (currentValue == level) {
- c.putNotNulls(rowId, n);
data.readBinary(n, c, rowId);
} else {
c.putNulls(rowId, n);
@@ -425,7 +408,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
- c.putNotNull(rowId + i);
data.readBinary(1, c, rowId + i);
} else {
c.putNull(rowId + i);
@@ -439,6 +421,40 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
+ /**
+ * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
+ * populated into `nulls`.
+ */
+ public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level,
+ VectorizedValuesReader data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ data.readIntegers(n, values, rowId);
+ } else {
+ nulls.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ values.putInt(rowId + i, data.readInteger());
+ } else {
+ nulls.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
// The RLE reader implements the vectorized decoding interface when used to decode dictionary
// IDs. This is different than the above APIs that decodes definitions levels along with values.
@@ -560,12 +576,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new RuntimeException("Unreachable");
}
+ private int ceil8(int value) {
+ return (value + 7) / 8;
+ }
+
/**
* Reads the next group.
*/
private void readNextGroup() {
- Preconditions.checkArgument(this.offset < this.end,
- "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
@@ -576,14 +594,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
+ int bytesToRead = ceil8(this.currentCount * this.bitWidth);
if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
}
currentBufferIdx = 0;
- int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
-
- bytesToRead = Math.min(bytesToRead, this.end - this.offset);
int valueIndex = 0;
for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index dafc589999..660f0f173a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -150,21 +150,21 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- SQL Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- SQL Parquet Reader 1350.56 11.65 1.00 X
- SQL Parquet MR 1844.09 8.53 0.73 X
- SQL Parquet Vectorized 1062.04 14.81 1.27 X
+ SQL Single Int Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ SQL Parquet Reader 1042 / 1208 15.1 66.2 1.0X
+ SQL Parquet MR 1544 / 1607 10.2 98.2 0.7X
+ SQL Parquet Vectorized 674 / 739 23.3 42.9 1.5X
*/
sqlBenchmark.run()
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- Parquet Reader Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- ParquetReader 610.40 25.77 1.00 X
- ParquetReader(Batched) 172.66 91.10 3.54 X
- ParquetReader(Batch -> Row) 192.28 81.80 3.17 X
+ Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ ParquetReader 565 / 609 27.8 35.9 1.0X
+ ParquetReader(Batched) 165 / 174 95.3 10.5 3.4X
+ ParquetReader(Batch -> Row) 158 / 188 99.3 10.1 3.6X
*/
parquetReaderBenchmark.run()
}
@@ -218,12 +218,12 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- Int and String Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------
- SQL Parquet Reader 1737.94 6.03 1.00 X
- SQL Parquet MR 2393.08 4.38 0.73 X
- SQL Parquet Vectorized 1442.99 7.27 1.20 X
- ParquetReader 1032.11 10.16 1.68 X
+ SQL Parquet Reader 1381 / 1679 7.6 131.7 1.0X
+ SQL Parquet MR 2005 / 2177 5.2 191.2 0.7X
+ SQL Parquet Vectorized 919 / 1044 11.4 87.6 1.5X
+ ParquetReader 1035 / 1163 10.1 98.7 1.3X
*/
benchmark.run()
}