aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Benchmark.scala6
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java146
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java66
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java274
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java37
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java9
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java13
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java2
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala93
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala27
12 files changed, 625 insertions, 56 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 457a1a05a1..d484cec7ae 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -62,10 +62,10 @@ private[spark] class Benchmark(
val firstRate = results.head.avgRate
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getProcessorName())
- printf("%-24s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
- println("-------------------------------------------------------------------------")
+ printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
+ println("-------------------------------------------------------------------------------")
results.zip(benchmarks).foreach { r =>
- printf("%-24s %16s %16s %14s\n",
+ printf("%-30s %16s %16s %14s\n",
r._2.name,
"%10.2f" format r._1.avgMs,
"%10.2f" format r._1.avgRate,
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 47818c0939..80805f15a8 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -21,10 +21,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.Encoding;
@@ -35,9 +35,12 @@ import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;
@@ -103,6 +106,25 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
private static final int DEFAULT_VAR_LEN_SIZE = 32;
/**
+ * columnBatch object that is used for batch decoding. This is created on first use and triggers
+ * batched decoding. It is not valid to interleave calls to the batched interface with the row
+ * by row RecordReader APIs.
+ * This is only enabled with additional flags for development. This is still a work in progress
+ * and currently unsupported cases will fail with potentially difficult to diagnose errors.
+ * This should be only turned on for development to work on this feature.
+ *
+ * TODOs:
+ * - Implement all the encodings to support vectorized.
+ * - Implement v2 page formats (just make sure we create the correct decoders).
+ */
+ private ColumnarBatch columnarBatch;
+
+ /**
+ * The default config on whether columnarBatch should be offheap.
+ */
+ private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+ /**
* Tries to initialize the reader for this split. Returns true if this reader supports reading
* this split and false otherwise.
*/
@@ -136,6 +158,15 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
}
@Override
+ public void close() throws IOException {
+ if (columnarBatch != null) {
+ columnarBatch.close();
+ columnarBatch = null;
+ }
+ super.close();
+ }
+
+ @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (batchIdx >= numBatched) {
if (!loadBatch()) return false;
@@ -154,6 +185,46 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
return (float) rowsReturned / totalRowCount;
}
+ /**
+ * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+ * This object is reused. Calling this enables the vectorized reader. This should be called
+ * before any calls to nextKeyValue/nextBatch.
+ */
+ public ColumnarBatch resultBatch() {
+ return resultBatch(DEFAULT_MEMORY_MODE);
+ }
+
+ public ColumnarBatch resultBatch(MemoryMode memMode) {
+ if (columnarBatch == null) {
+ columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+ }
+ return columnarBatch;
+ }
+
+ /**
+ * Advances to the next batch of rows. Returns false if there are no more.
+ */
+ public boolean nextBatch() throws IOException {
+ assert(columnarBatch != null);
+ columnarBatch.reset();
+ if (rowsReturned >= totalRowCount) return false;
+ checkEndOfRowGroup();
+
+ int num = (int)Math.min((long) columnarBatch.capacity(), totalRowCount - rowsReturned);
+ for (int i = 0; i < columnReaders.length; ++i) {
+ switch (columnReaders[i].descriptor.getType()) {
+ case INT32:
+ columnReaders[i].readIntBatch(num, columnarBatch.column(i));
+ break;
+ default:
+ throw new IOException("Unsupported type: " + columnReaders[i].descriptor.getType());
+ }
+ }
+ rowsReturned += num;
+ columnarBatch.setNumRows(num);
+ return true;
+ }
+
private void initializeInternal() throws IOException {
/**
* Check that the requested schema is supported.
@@ -382,7 +453,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
*
* Decoder to return values from a single column.
*/
- private static final class ColumnReader {
+ private final class ColumnReader {
/**
* Total number of values read.
*/
@@ -416,6 +487,10 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
private IntIterator definitionLevelColumn;
private ValuesReader dataColumn;
+ // Only set if vectorized decoding is true. This is used instead of the row by row decoding
+ // with `definitionLevelColumn`.
+ private VectorizedRleValuesReader defColumn;
+
/**
* Total number of values in this column (in this row group).
*/
@@ -521,6 +596,35 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
return definitionLevelColumn.nextInt() == maxDefLevel;
}
+ /**
+ * Reads `total` values from this columnReader into column.
+ * TODO: implement the other encodings.
+ */
+ private void readIntBatch(int total, ColumnVector column) throws IOException {
+ int rowId = 0;
+ while (total > 0) {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int)(endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ readPage();
+ leftInPage = (int)(endOfPageValueCount - valuesRead);
+ }
+ int num = Math.min(total, leftInPage);
+ defColumn.readIntegers(
+ num, column, rowId, maxDefLevel, (VectorizedValuesReader)dataColumn, 0);
+
+ // Remap the values if it is dictionary encoded.
+ if (useDictionary) {
+ for (int i = rowId; i < rowId + num; ++i) {
+ column.putInt(i, dictionary.decodeToInt(column.getInt(i)));
+ }
+ }
+ valuesRead += num;
+ rowId += num;
+ total -= num;
+ }
+ }
+
private void readPage() throws IOException {
DataPage page = pageReader.readPage();
// TODO: Why is this a visitor?
@@ -547,21 +651,28 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
});
}
- private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
- throws IOException {
- this.pageValueCount = valueCount;
+ private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
if (dictionary == null) {
throw new IOException(
"could not read page in col " + descriptor +
" as the dictionary was missing for encoding " + dataEncoding);
}
- this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
- descriptor, VALUES, dictionary);
+ if (columnarBatch != null && dataEncoding == Encoding.PLAIN_DICTIONARY) {
+ this.dataColumn = new VectorizedRleValuesReader();
+ } else {
+ this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
+ descriptor, VALUES, dictionary);
+ }
this.useDictionary = true;
} else {
- this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ if (columnarBatch != null && dataEncoding == Encoding.PLAIN) {
+ this.dataColumn = new VectorizedPlainValuesReader(4);
+ } else {
+ this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+ }
this.useDictionary = false;
}
@@ -573,8 +684,19 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
}
private void readPageV1(DataPageV1 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
- ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ ValuesReader dlReader;
+
+ // Initialize the decoders. Use custom ones if vectorized decoding is enabled.
+ if (columnarBatch != null && page.getDlEncoding() == Encoding.RLE) {
+ int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+ assert(bitWidth != 0); // not implemented
+ this.defColumn = new VectorizedRleValuesReader(bitWidth);
+ dlReader = this.defColumn;
+ } else {
+ dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+ }
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
try {
@@ -583,20 +705,20 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
int next = rlReader.getNextOffset();
dlReader.initFromPage(pageValueCount, bytes, next);
next = dlReader.getNextOffset();
- initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+ initDataReader(page.getValueEncoding(), bytes, next);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}
private void readPageV2(DataPageV2 page) throws IOException {
+ this.pageValueCount = page.getValueCount();
this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels(), descriptor);
this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(),
page.getDefinitionLevels(), descriptor);
try {
- initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0,
- page.getValueCount());
+ initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
new file mode 100644
index 0000000000..dac0c52ebd
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.unsafe.Platform;
+
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
+ */
+public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
+ private byte[] buffer;
+ private int offset;
+ private final int byteSize;
+
+ public VectorizedPlainValuesReader(int byteSize) {
+ this.byteSize = byteSize;
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
+ this.buffer = bytes;
+ this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
+ }
+
+ @Override
+ public void skip() {
+ offset += byteSize;
+ }
+
+ @Override
+ public void skip(int n) {
+ offset += n * byteSize;
+ }
+
+ @Override
+ public void readIntegers(int total, ColumnVector c, int rowId) {
+ c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
+ offset += 4 * total;
+ }
+
+ @Override
+ public int readInteger() {
+ int v = Platform.getInt(buffer, offset);
+ offset += 4;
+ return v;
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
new file mode 100644
index 0000000000..493ec9deed
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * A values reader for Parquet's run-length encoded data. This is based off of the version in
+ * parquet-mr with these changes:
+ * - Supports the vectorized interface.
+ * - Works on byte arrays(byte[]) instead of making byte streams.
+ *
+ * This encoding is used in multiple places:
+ * - Definition/Repetition levels
+ * - Dictionary ids.
+ */
+public final class VectorizedRleValuesReader extends ValuesReader {
+ // Current decoding mode. The encoded data contains groups of either run length encoded data
+ // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+ // the number of values in the group.
+ // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
+ private enum MODE {
+ RLE,
+ PACKED
+ }
+
+ // Encoded data.
+ private byte[] in;
+ private int end;
+ private int offset;
+
+ // bit/byte width of decoded data and utility to batch unpack them.
+ private int bitWidth;
+ private int bytesWidth;
+ private BytePacker packer;
+
+ // Current decoding mode and values
+ private MODE mode;
+ private int currentCount;
+ private int currentValue;
+
+ // Buffer of decoded values if the values are PACKED.
+ private int[] currentBuffer = new int[16];
+ private int currentBufferIdx = 0;
+
+ // If true, the bit width is fixed. This decoder is used in different places and this also
+ // controls if we need to read the bitwidth from the beginning of the data stream.
+ private final boolean fixedWidth;
+
+ public VectorizedRleValuesReader() {
+ fixedWidth = false;
+ }
+
+ public VectorizedRleValuesReader(int bitWidth) {
+ fixedWidth = true;
+ init(bitWidth);
+ }
+
+ @Override
+ public void initFromPage(int valueCount, byte[] page, int start) {
+ this.offset = start;
+ this.in = page;
+ if (fixedWidth) {
+ int length = readIntLittleEndian();
+ this.end = this.offset + length;
+ } else {
+ this.end = page.length;
+ if (this.end != this.offset) init(page[this.offset++] & 255);
+ }
+ this.currentCount = 0;
+ }
+
+ /**
+ * Initializes the internal state for decoding ints of `bitWidth`.
+ */
+ private void init(int bitWidth) {
+ Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+ this.bitWidth = bitWidth;
+ this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+ this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+ }
+
+ @Override
+ public int getNextOffset() {
+ return this.end;
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return this.readInteger() != 0;
+ }
+
+ @Override
+ public void skip() {
+ this.readInteger();
+ }
+
+ @Override
+ public int readValueDictionaryId() {
+ return readInteger();
+ }
+
+ @Override
+ public int readInteger() {
+ if (this.currentCount == 0) { this.readNextGroup(); }
+
+ this.currentCount--;
+ switch (mode) {
+ case RLE:
+ return this.currentValue;
+ case PACKED:
+ return this.currentBuffer[currentBufferIdx++];
+ }
+ throw new RuntimeException("Unreachable");
+ }
+
+ /**
+ * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader
+ * reads the definition levels and then will read from `data` for the non-null values.
+ * If the value is null, c will be populated with `nullValue`.
+ *
+ * This is a batched version of this logic:
+ * if (this.readInt() == level) {
+ * c[rowId] = data.readInteger();
+ * } else {
+ * c[rowId] = nullValue;
+ * }
+ */
+ public void readIntegers(int total, ColumnVector c, int rowId, int level,
+ VectorizedValuesReader data, int nullValue) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) this.readNextGroup();
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ data.readIntegers(n, c, rowId);
+ c.putNotNulls(rowId, n);
+ } else {
+ c.putNulls(rowId, n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ c.putInt(rowId + i, data.readInteger());
+ c.putNotNull(rowId + i);
+ } else {
+ c.putInt(rowId + i, nullValue);
+ c.putNull(rowId + i);
+ }
+ }
+ break;
+ }
+ rowId += n;
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ /**
+ * Reads the next varint encoded int.
+ */
+ private int readUnsignedVarInt() {
+ int value = 0;
+ int shift = 0;
+ int b;
+ do {
+ b = in[offset++] & 255;
+ value |= (b & 0x7F) << shift;
+ shift += 7;
+ } while ((b & 0x80) != 0);
+ return value;
+ }
+
+ /**
+ * Reads the next 4 byte little endian int.
+ */
+ private int readIntLittleEndian() {
+ int ch4 = in[offset] & 255;
+ int ch3 = in[offset + 1] & 255;
+ int ch2 = in[offset + 2] & 255;
+ int ch1 = in[offset + 3] & 255;
+ offset += 4;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
+ /**
+ * Reads the next byteWidth little endian int.
+ */
+ private int readIntLittleEndianPaddedOnBitWidth() {
+ switch (bytesWidth) {
+ case 0:
+ return 0;
+ case 1:
+ return in[offset++] & 255;
+ case 2: {
+ int ch2 = in[offset] & 255;
+ int ch1 = in[offset + 1] & 255;
+ offset += 2;
+ return (ch1 << 8) + ch2;
+ }
+ case 3: {
+ int ch3 = in[offset] & 255;
+ int ch2 = in[offset + 1] & 255;
+ int ch1 = in[offset + 2] & 255;
+ offset += 3;
+ return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+ }
+ case 4: {
+ return readIntLittleEndian();
+ }
+ }
+ throw new RuntimeException("Unreachable");
+ }
+
+ /**
+ * Reads the next group.
+ */
+ private void readNextGroup() {
+ Preconditions.checkArgument(this.offset < this.end,
+ "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
+ int header = readUnsignedVarInt();
+ this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+ switch (mode) {
+ case RLE:
+ this.currentCount = header >>> 1;
+ this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+ return;
+ case PACKED:
+ int numGroups = header >>> 1;
+ this.currentCount = numGroups * 8;
+
+ if (this.currentBuffer.length < this.currentCount) {
+ this.currentBuffer = new int[this.currentCount];
+ }
+ currentBufferIdx = 0;
+ int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
+
+ bytesToRead = Math.min(bytesToRead, this.end - this.offset);
+ int valueIndex = 0;
+ for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
+ this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
+ valueIndex += 8;
+ }
+ offset += bytesToRead;
+ return;
+ default:
+ throw new ParquetDecodingException("not a valid mode " + this.mode);
+ }
+ }
+} \ No newline at end of file
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
new file mode 100644
index 0000000000..49a9ed83d5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * Interface for value decoding that supports vectorized (aka batched) decoding.
+ * TODO: merge this into parquet-mr.
+ */
+public interface VectorizedValuesReader {
+ int readInteger();
+
+ /*
+ * Reads `total` values into `c` start at `c[rowId]`
+ */
+ void readIntegers(int total, ColumnVector c, int rowId);
+
+ // TODO: add all the other parquet types.
+
+ void skip(int n);
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index d9dde92ceb..85509751db 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.execution.vectorized;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.types.DataType;
/**
@@ -33,8 +34,8 @@ public abstract class ColumnVector {
/**
* Allocates a column with each element of size `width` either on or off heap.
*/
- public static ColumnVector allocate(int capacity, DataType type, boolean offHeap) {
- if (offHeap) {
+ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
+ if (mode == MemoryMode.OFF_HEAP) {
return new OffHeapColumnVector(capacity, type);
} else {
return new OnHeapColumnVector(capacity, type);
@@ -111,7 +112,7 @@ public abstract class ColumnVector {
public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
/**
- * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* The data in src must be 4-byte little endian ints.
*/
public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
@@ -138,7 +139,7 @@ public abstract class ColumnVector {
public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
/**
- * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
* The data in src must be ieee formated doubles.
*/
public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 47defac453..2c55f854c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
import java.util.Arrays;
import java.util.Iterator;
+import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
@@ -59,12 +60,12 @@ public final class ColumnarBatch {
// Total number of rows that have been filtered.
private int numRowsFiltered = 0;
- public static ColumnarBatch allocate(StructType schema, boolean offHeap) {
- return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
+ public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) {
+ return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
}
- public static ColumnarBatch allocate(StructType schema, boolean offHeap, int maxRows) {
- return new ColumnarBatch(schema, maxRows, offHeap);
+ public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
+ return new ColumnarBatch(schema, maxRows, memMode);
}
/**
@@ -282,7 +283,7 @@ public final class ColumnarBatch {
++numRowsFiltered;
}
- private ColumnarBatch(StructType schema, int maxRows, boolean offHeap) {
+ private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
this.schema = schema;
this.capacity = maxRows;
this.columns = new ColumnVector[schema.size()];
@@ -290,7 +291,7 @@ public final class ColumnarBatch {
for (int i = 0; i < schema.fields().length; ++i) {
StructField field = schema.fields()[i];
- columns[i] = ColumnVector.allocate(maxRows, field.dataType(), offHeap);
+ columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode);
}
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2a9a2d1104..6180dd308e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -49,6 +49,7 @@ public final class OffHeapColumnVector extends ColumnVector {
} else {
throw new RuntimeException("Unhandled " + type);
}
+ anyNullsSet = true;
reset();
}
@@ -98,6 +99,7 @@ public final class OffHeapColumnVector extends ColumnVector {
@Override
public final void putNotNulls(int rowId, int count) {
+ if (!anyNullsSet) return;
long offset = nulls + rowId;
for (int i = 0; i < count; ++i, ++offset) {
Platform.putByte(null, offset, (byte) 0);
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index a7b3addf11..76d9956c38 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -97,6 +97,7 @@ public final class OnHeapColumnVector extends ColumnVector {
@Override
public final void putNotNulls(int rowId, int count) {
+ if (!anyNullsSet) return;
for (int i = 0; i < count; ++i) {
nulls[rowId + i] = (byte)0;
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index ae95b50e1e..14be9eec9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -59,24 +59,31 @@ object ParquetReadBenchmark {
}
def intScanBenchmark(values: Int): Unit = {
+ // Benchmarks running through spark sql.
+ val sqlBenchmark = new Benchmark("SQL Single Int Column Scan", values)
+ // Benchmarks driving reader component directly.
+ val parquetReaderBenchmark = new Benchmark("Parquet Reader Single Int Column Scan", values)
+
withTempPath { dir =>
- sqlContext.range(values).write.parquet(dir.getCanonicalPath)
- withTempTable("tempTable") {
+ withTempTable("t1", "tempTable") {
+ sqlContext.range(values).registerTempTable("t1")
+ sqlContext.sql("select cast(id as INT) as id from t1")
+ .write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
- val benchmark = new Benchmark("Single Int Column Scan", values)
- benchmark.addCase("SQL Parquet Reader") { iter =>
+ sqlBenchmark.addCase("SQL Parquet Reader") { iter =>
sqlContext.sql("select sum(id) from tempTable").collect()
}
- benchmark.addCase("SQL Parquet MR") { iter =>
+ sqlBenchmark.addCase("SQL Parquet MR") { iter =>
withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
sqlContext.sql("select sum(id) from tempTable").collect()
}
}
val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
- benchmark.addCase("ParquetReader") { num =>
+ // Driving the parquet reader directly without Spark.
+ parquetReaderBenchmark.addCase("ParquetReader") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
@@ -87,26 +94,82 @@ object ParquetReadBenchmark {
if (!record.isNullAt(0)) sum += record.getInt(0)
}
reader.close()
- }}
+ }
+ }
+
+ // Driving the parquet reader in batch mode directly.
+ parquetReaderBenchmark.addCase("ParquetReader(Batched)") { num =>
+ var sum = 0L
+ files.map(_.asInstanceOf[String]).foreach { p =>
+ val reader = new UnsafeRowParquetRecordReader
+ try {
+ reader.initialize(p, ("id" :: Nil).asJava)
+ val batch = reader.resultBatch()
+ val col = batch.column(0)
+ while (reader.nextBatch()) {
+ val numRows = batch.numRows()
+ var i = 0
+ while (i < numRows) {
+ if (!col.getIsNull(i)) sum += col.getInt(i)
+ i += 1
+ }
+ }
+ } finally {
+ reader.close()
+ }
+ }
+ }
+
+ // Decoding in vectorized but having the reader return rows.
+ parquetReaderBenchmark.addCase("ParquetReader(Batch -> Row)") { num =>
+ var sum = 0L
+ files.map(_.asInstanceOf[String]).foreach { p =>
+ val reader = new UnsafeRowParquetRecordReader
+ try {
+ reader.initialize(p, ("id" :: Nil).asJava)
+ val batch = reader.resultBatch()
+ while (reader.nextBatch()) {
+ val it = batch.rowIterator()
+ while (it.hasNext) {
+ val record = it.next()
+ if (!record.isNullAt(0)) sum += record.getInt(0)
+ }
+ }
+ } finally {
+ reader.close()
+ }
+ }
+ }
/*
- Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
- Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
- -------------------------------------------------------------------------
- SQL Parquet Reader 1910.0 13.72 1.00 X
- SQL Parquet MR 2330.0 11.25 0.82 X
- ParquetReader 1252.6 20.93 1.52 X
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------------
+ SQL Parquet Reader 1682.6 15.58 1.00 X
+ SQL Parquet MR 2379.6 11.02 0.71 X
*/
- benchmark.run()
+ sqlBenchmark.run()
+
+ /*
+ Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+ Parquet Reader Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------------
+ ParquetReader 610.40 25.77 1.00 X
+ ParquetReader(Batched) 172.66 91.10 3.54 X
+ ParquetReader(Batch -> Row) 192.28 81.80 3.17 X
+ */
+ parquetReaderBenchmark.run()
}
}
}
def intStringScanBenchmark(values: Int): Unit = {
+ val benchmark = new Benchmark("Int and String Scan", values)
+
withTempPath { dir =>
withTempTable("t1", "tempTable") {
sqlContext.range(values).registerTempTable("t1")
- sqlContext.sql("select id as c1, cast(id as STRING) as c2 from t1")
+ sqlContext.sql("select cast(id as INT) as c1, cast(id as STRING) as c2 from t1")
.write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index e28153d12a..bfe944d835 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.nio.ByteBuffer
+import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.vectorized.ColumnVector
import org.apache.spark.sql.types.IntegerType
@@ -136,7 +137,7 @@ object ColumnarBatchBenchmark {
// Access through the column API with on heap memory
val columnOnHeap = { i: Int =>
- val col = ColumnVector.allocate(count, IntegerType, false)
+ val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
var sum = 0L
for (n <- 0L until iters) {
var i = 0
@@ -155,7 +156,7 @@ object ColumnarBatchBenchmark {
// Access through the column API with off heap memory
def columnOffHeap = { i: Int => {
- val col = ColumnVector.allocate(count, IntegerType, true)
+ val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
var sum = 0L
for (n <- 0L until iters) {
var i = 0
@@ -174,7 +175,7 @@ object ColumnarBatchBenchmark {
// Access by directly getting the buffer backing the column.
val columnOffheapDirect = { i: Int =>
- val col = ColumnVector.allocate(count, IntegerType, true)
+ val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
var sum = 0L
for (n <- 0L until iters) {
var addr = col.valuesNativeAddress()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 305a83e3e4..d5e517c7f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
import scala.util.Random
import org.apache.spark.SparkFunSuite
+import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
@@ -28,10 +29,10 @@ import org.apache.spark.unsafe.Platform
class ColumnarBatchSuite extends SparkFunSuite {
test("Null Apis") {
- (false :: true :: Nil).foreach { offHeap => {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val reference = mutable.ArrayBuffer.empty[Boolean]
- val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+ val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0
assert(column.anyNullsSet() == false)
@@ -64,7 +65,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
reference.zipWithIndex.foreach { v =>
assert(v._1 == column.getIsNull(v._2))
- if (offHeap) {
+ if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.nullsNativeAddress()
assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
}
@@ -74,12 +75,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
test("Int Apis") {
- (false :: true :: Nil).foreach { offHeap => {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Int]
- val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+ val column = ColumnVector.allocate(1024, IntegerType, memMode)
var idx = 0
val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
@@ -131,8 +132,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
- assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
- if (offHeap) {
+ assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
+ if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
}
@@ -142,12 +143,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
test("Double APIs") {
- (false :: true :: Nil).foreach { offHeap => {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
val random = new Random(seed)
val reference = mutable.ArrayBuffer.empty[Double]
- val column = ColumnVector.allocate(1024, DoubleType, offHeap)
+ val column = ColumnVector.allocate(1024, DoubleType, memMode)
var idx = 0
val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
@@ -198,8 +199,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
reference.zipWithIndex.foreach { v =>
- assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
- if (offHeap) {
+ assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode)
+ if (memMode == MemoryMode.OFF_HEAP) {
val addr = column.valuesNativeAddress()
assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
}
@@ -209,13 +210,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
test("ColumnarBatch basic") {
- (false :: true :: Nil).foreach { offHeap => {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
- val batch = ColumnarBatch.allocate(schema, offHeap)
+ val batch = ColumnarBatch.allocate(schema, memMode)
assert(batch.numCols() == 3)
assert(batch.numRows() == 0)
assert(batch.numValidRows() == 0)