diff options
3 files changed, 59 insertions, 54 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 4576ac2a32..9d50cfab3b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -628,7 +628,8 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas dictionaryIds.reserve(total); } // Read and decode dictionary ids. - readIntBatch(rowId, num, dictionaryIds); + defColumn.readIntegers( + num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); decodeDictionaryIds(rowId, num, column); } else { switch (descriptor.getType()) { @@ -739,18 +740,6 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas default: throw new NotImplementedException("Unsupported type: " + descriptor.getType()); } - - if (dictionaryIds.numNulls() > 0) { - // Copy the NULLs over. - // TODO: we can improve this by decoding the NULLs directly into column. This would - // mean we decode the int ids into `dictionaryIds` and the NULLs into `column` and then - // just do the ID remapping as above. - for (int i = 0; i < num; ++i) { - if (dictionaryIds.getIsNull(rowId + i)) { - column.putNull(rowId + i); - } - } - } } /** @@ -769,7 +758,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas // TODO: implement remaining type conversions if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType) { defColumn.readIntegers( - num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, 0); + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else if (column.dataType() == DataTypes.ByteType) { defColumn.readBytes( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index 629959a73b..b2048c0e39 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet; -import org.apache.commons.lang.NotImplementedException; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; @@ -176,11 +175,11 @@ public final class VectorizedRleValuesReader extends ValuesReader * if (this.readInt() == level) { * c[rowId] = data.readInteger(); * } else { - * c[rowId] = nullValue; + * c[rowId] = null; * } */ public void readIntegers(int total, ColumnVector c, int rowId, int level, - VectorizedValuesReader data, int nullValue) { + VectorizedValuesReader data) { int left = total; while (left > 0) { if (this.currentCount == 0) this.readNextGroup(); @@ -189,7 +188,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readIntegers(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -198,9 +196,7 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putInt(rowId + i, data.readInteger()); - c.putNotNull(rowId + i); } else { - c.putInt(rowId + i, nullValue); c.putNull(rowId + i); } } @@ -223,7 +219,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readBooleans(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -232,7 +227,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putBoolean(rowId + i, data.readBoolean()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -257,7 +251,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; i++) { c.putLong(rowId + i, data.readInteger()); } - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -266,7 +259,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putLong(rowId + i, data.readInteger()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -289,7 +281,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readBytes(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -298,7 +289,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putByte(rowId + i, data.readByte()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -321,7 +311,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readLongs(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -330,7 +319,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putLong(rowId + i, data.readLong()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -353,7 +341,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readFloats(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -362,7 +349,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putFloat(rowId + i, data.readFloat()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -385,7 +371,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case RLE: if (currentValue == level) { data.readDoubles(n, c, rowId); - c.putNotNulls(rowId, n); } else { c.putNulls(rowId, n); } @@ -394,7 +379,6 @@ public final class VectorizedRleValuesReader extends ValuesReader for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { c.putDouble(rowId + i, data.readDouble()); - c.putNotNull(rowId + i); } else { c.putNull(rowId + i); } @@ -416,7 +400,6 @@ public final class VectorizedRleValuesReader extends ValuesReader switch (mode) { case RLE: if (currentValue == level) { - c.putNotNulls(rowId, n); data.readBinary(n, c, rowId); } else { c.putNulls(rowId, n); @@ -425,7 +408,6 @@ public final class VectorizedRleValuesReader extends ValuesReader case PACKED: for (int i = 0; i < n; ++i) { if (currentBuffer[currentBufferIdx++] == level) { - c.putNotNull(rowId + i); data.readBinary(1, c, rowId + i); } else { c.putNull(rowId + i); @@ -439,6 +421,40 @@ public final class VectorizedRleValuesReader extends ValuesReader } } + /** + * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is + * populated into `nulls`. + */ + public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level, + VectorizedValuesReader data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + data.readIntegers(n, values, rowId); + } else { + nulls.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + values.putInt(rowId + i, data.readInteger()); + } else { + nulls.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + // The RLE reader implements the vectorized decoding interface when used to decode dictionary // IDs. This is different than the above APIs that decodes definitions levels along with values. @@ -560,12 +576,14 @@ public final class VectorizedRleValuesReader extends ValuesReader throw new RuntimeException("Unreachable"); } + private int ceil8(int value) { + return (value + 7) / 8; + } + /** * Reads the next group. */ private void readNextGroup() { - Preconditions.checkArgument(this.offset < this.end, - "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end); int header = readUnsignedVarInt(); this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { @@ -576,14 +594,12 @@ public final class VectorizedRleValuesReader extends ValuesReader case PACKED: int numGroups = header >>> 1; this.currentCount = numGroups * 8; + int bytesToRead = ceil8(this.currentCount * this.bitWidth); if (this.currentBuffer.length < this.currentCount) { this.currentBuffer = new int[this.currentCount]; } currentBufferIdx = 0; - int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D); - - bytesToRead = Math.min(bytesToRead, this.end - this.offset); int valueIndex = 0; for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) { this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala index dafc589999..660f0f173a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala @@ -150,21 +150,21 @@ object ParquetReadBenchmark { /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz - SQL Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - SQL Parquet Reader 1350.56 11.65 1.00 X - SQL Parquet MR 1844.09 8.53 0.73 X - SQL Parquet Vectorized 1062.04 14.81 1.27 X + SQL Single Int Column Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + SQL Parquet Reader 1042 / 1208 15.1 66.2 1.0X + SQL Parquet MR 1544 / 1607 10.2 98.2 0.7X + SQL Parquet Vectorized 674 / 739 23.3 42.9 1.5X */ sqlBenchmark.run() /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz - Parquet Reader Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------------- - ParquetReader 610.40 25.77 1.00 X - ParquetReader(Batched) 172.66 91.10 3.54 X - ParquetReader(Batch -> Row) 192.28 81.80 3.17 X + Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + ParquetReader 565 / 609 27.8 35.9 1.0X + ParquetReader(Batched) 165 / 174 95.3 10.5 3.4X + ParquetReader(Batch -> Row) 158 / 188 99.3 10.1 3.6X */ parquetReaderBenchmark.run() } @@ -218,12 +218,12 @@ object ParquetReadBenchmark { /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz - Int and String Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate + Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------- - SQL Parquet Reader 1737.94 6.03 1.00 X - SQL Parquet MR 2393.08 4.38 0.73 X - SQL Parquet Vectorized 1442.99 7.27 1.20 X - ParquetReader 1032.11 10.16 1.68 X + SQL Parquet Reader 1381 / 1679 7.6 131.7 1.0X + SQL Parquet MR 2005 / 2177 5.2 191.2 0.7X + SQL Parquet Vectorized 919 / 1044 11.4 87.6 1.5X + ParquetReader 1035 / 1163 10.1 98.7 1.3X */ benchmark.run() } |