aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2016-02-26 12:43:50 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-26 12:43:50 -0800
commit0598a2b81d1426dd2cf9e6fc32cef345364d18c6 (patch)
treea7341a42f902110e317a895968d7df7cd5e6ada4 /sql
parent6df1e55a6594ae4bc7882f44af8d230aad9489b4 (diff)
downloadspark-0598a2b81d1426dd2cf9e6fc32cef345364d18c6.tar.gz
spark-0598a2b81d1426dd2cf9e6fc32cef345364d18c6.tar.bz2
spark-0598a2b81d1426dd2cf9e6fc32cef345364d18c6.zip
[SPARK-13499] [SQL] Performance improvements for parquet reader.
## What changes were proposed in this pull request? This patch includes these performance fixes: - Remove unnecessary setNotNull() calls. The NULL bits are cleared already. - Speed up RLE group decoding - Speed up dictionary decoding by decoding NULLs directly into the result. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) In addition to the updated benchmarks, on TPCDS, the result of these changes running Q55 (sf40) is: ``` TPCDS: Best/Avg Time(ms) Rate(M/s) Per Row(ns) --------------------------------------------------------------------------------- q55 (Before) 6398 / 6616 18.0 55.5 q55 (After) 4983 / 5189 23.1 43.3 ``` Author: Nong Li <nong@databricks.com> Closes #11375 from nongli/spark-13499.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java17
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java66
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala30
3 files changed, 59 insertions, 54 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 4576ac2a32..9d50cfab3b 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -628,7 +628,8 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
dictionaryIds.reserve(total);
}
// Read and decode dictionary ids.
- readIntBatch(rowId, num, dictionaryIds);
+ defColumn.readIntegers(
+ num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
decodeDictionaryIds(rowId, num, column);
} else {
switch (descriptor.getType()) {
@@ -739,18 +740,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
default:
throw new NotImplementedException("Unsupported type: " + descriptor.getType());
}
-
- if (dictionaryIds.numNulls() > 0) {
- // Copy the NULLs over.
- // TODO: we can improve this by decoding the NULLs directly into column. This would
- // mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then
- // just do the ID remapping as above.
- for (int i = 0; i < num; ++i) {
- if (dictionaryIds.getIsNull(rowId + i)) {
- column.putNull(rowId + i);
- }
- }
- }
}
/**
@@ -769,7 +758,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
// TODO: implement remaining type conversions
if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) {
defColumn.readIntegers(
- num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0);
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.ByteType) {
defColumn.readBytes(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 629959a73b..b2048c0e39 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.values.ValuesReader;
@@ -176,11 +175,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
* if (this.readInt() == level) {
* c[rowId] = data.readInteger();
* } else {
- * c[rowId] = nullValue;
+ * c[rowId] = null;
* }
*/
public void readIntegers(int total, ColumnVector c, int rowId, int level,
- VectorizedValuesReader data, int nullValue) {
+ VectorizedValuesReader data) {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
@@ -189,7 +188,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readIntegers(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -198,9 +196,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putInt(rowId + i, data.readInteger());
- c.putNotNull(rowId + i);
} else {
- c.putInt(rowId + i, nullValue);
c.putNull(rowId + i);
}
}
@@ -223,7 +219,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBooleans(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -232,7 +227,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putBoolean(rowId + i, data.readBoolean());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -257,7 +251,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; i++) {
c.putLong(rowId + i, data.readInteger());
}
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -266,7 +259,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readInteger());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -289,7 +281,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readBytes(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -298,7 +289,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putByte(rowId + i, data.readByte());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -321,7 +311,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readLongs(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -330,7 +319,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i, data.readLong());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -353,7 +341,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readFloats(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -362,7 +349,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putFloat(rowId + i, data.readFloat());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -385,7 +371,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case RLE:
if (currentValue == level) {
data.readDoubles(n, c, rowId);
- c.putNotNulls(rowId, n);
} else {
c.putNulls(rowId, n);
}
@@ -394,7 +379,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putDouble(rowId + i, data.readDouble());
- c.putNotNull(rowId + i);
} else {
c.putNull(rowId + i);
}
@@ -416,7 +400,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
switch (mode) {
case RLE:
if (currentValue == level) {
- c.putNotNulls(rowId, n);
data.readBinary(n, c, rowId);
} else {
c.putNulls(rowId, n);
@@ -425,7 +408,6 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
- c.putNotNull(rowId + i);
data.readBinary(1, c, rowId + i);
} else {
c.putNull(rowId + i);
@@ -439,6 +421,40 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
+ /**
+ * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
+ * populated into `nulls`.
+ */
+ public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level,
+ VectorizedValuesReader data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ data.readIntegers(n, values, rowId);
+ } else {
+ nulls.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ values.putInt(rowId + i, data.readInteger());
+ } else {
+ nulls.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
// The RLE reader implements the vectorized decoding interface when used to decode dictionary
// IDs. This is different than the above APIs that decodes definitions levels along with values.
@@ -560,12 +576,14 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new RuntimeException("Unreachable");
}
+ private int ceil8(int value) {
+ return (value + 7) / 8;
+ }
+
/**
* Reads the next group.
*/
private void readNextGroup() {
- Preconditions.checkArgument(this.offset < this.end,
- "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
int header = readUnsignedVarInt();
this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
@@ -576,14 +594,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
case PACKED:
int numGroups = header >>> 1;
this.currentCount = numGroups * 8;
+ int bytesToRead = ceil8(this.currentCount * this.bitWidth);
if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
}
currentBufferIdx = 0;
- int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
-
- bytesToRead = Math.min(bytesToRead, this.end - this.offset);
int valueIndex = 0;
for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index dafc589999..660f0f173a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -150,21 +150,21 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- SQL Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- SQL Parquet Reader 1350.56 11.65 1.00 X
- SQL Parquet MR 1844.09 8.53 0.73 X
- SQL Parquet Vectorized 1062.04 14.81 1.27 X
+ SQL Single Int Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ SQL Parquet Reader 1042 / 1208 15.1 66.2 1.0X
+ SQL Parquet MR 1544 / 1607 10.2 98.2 0.7X
+ SQL Parquet Vectorized 674 / 739 23.3 42.9 1.5X
*/
sqlBenchmark.run()
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- Parquet Reader Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------------
- ParquetReader 610.40 25.77 1.00 X
- ParquetReader(Batched) 172.66 91.10 3.54 X
- ParquetReader(Batch -> Row) 192.28 81.80 3.17 X
+ Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ ParquetReader 565 / 609 27.8 35.9 1.0X
+ ParquetReader(Batched) 165 / 174 95.3 10.5 3.4X
+ ParquetReader(Batch -> Row) 158 / 188 99.3 10.1 3.6X
*/
parquetReaderBenchmark.run()
}
@@ -218,12 +218,12 @@ object ParquetReadBenchmark {
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- Int and String Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------
- SQL Parquet Reader 1737.94 6.03 1.00 X
- SQL Parquet MR 2393.08 4.38 0.73 X
- SQL Parquet Vectorized 1442.99 7.27 1.20 X
- ParquetReader 1032.11 10.16 1.68 X
+ SQL Parquet Reader 1381 / 1679 7.6 131.7 1.0X
+ SQL Parquet MR 2005 / 2177 5.2 191.2 0.7X
+ SQL Parquet Vectorized 919 / 1044 11.4 87.6 1.5X
+ ParquetReader 1035 / 1163 10.1 98.7 1.3X
*/
benchmark.run()
}