aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/java
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2015-11-18 18:38:45 -0800
committerReynold Xin <rxin@databricks.com>2015-11-18 18:38:45 -0800
commit6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d (patch)
treecf1c2b5184a996d4e931d1837dd7899199c2ba72 /sql/catalyst/src/main/java
parente61367b9f9bfc8e123369d55d7ca5925568b98a7 (diff)
downloadspark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.gz
spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.bz2
spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.zip
[SPARK-11787][SQL] Improve Parquet scan performance when using flat schemas.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project that is much faster for flat schemas. Instead of using the general converter mechanism from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a customer RecordReader that directly assembles into UnsafeRows. This is optionally disabled and only used for supported schemas. Using the tpcds store sales table and doing a sum of increasingly more columns, the results are: For 1 Column: Before: 11.3M rows/second After: 18.2M rows/second For 2 Columns: Before: 7.2M rows/second After: 11.2M rows/second For 5 Columns: Before: 2.9M rows/second After: 4.5M rows/second Author: Nong Li <nong@databricks.com> Closes #9774 from nongli/parquet.
Diffstat (limited to 'sql/catalyst/src/main/java')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java9
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java32
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java20
3 files changed, 49 insertions, 12 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 5ba14ebdb6..33769363a0 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -178,6 +178,15 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
pointTo(buf, Platform.BYTE_ARRAY_OFFSET, numFields, sizeInBytes);
}
+ /**
+ * Updates this UnsafeRow preserving the number of fields.
+ * @param buf byte array to point to
+ * @param sizeInBytes the number of bytes valid in the byte array
+ */
+ public void pointTo(byte[] buf, int sizeInBytes) {
+ pointTo(buf, numFields, sizeInBytes);
+ }
+
@Override
public void setNullAt(int i) {
assertIndexIsValid(i);
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index 9c94686780..d26b1b187c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -17,19 +17,28 @@
package org.apache.spark.sql.catalyst.expressions.codegen;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.Platform;
/**
- * A helper class to manage the row buffer used in `GenerateUnsafeProjection`.
- *
- * Note that it is only used in `GenerateUnsafeProjection`, so it's safe to mark member variables
- * public for ease of use.
+ * A helper class to manage the row buffer when construct unsafe rows.
*/
public class BufferHolder {
- public byte[] buffer = new byte[64];
+ public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
- public void grow(int neededSize) {
+ public BufferHolder() {
+ this(64);
+ }
+
+ public BufferHolder(int size) {
+ buffer = new byte[size];
+ }
+
+ /**
+ * Grows the buffer to at least neededSize. If row is non-null, points the row to the buffer.
+ */
+ public void grow(int neededSize, UnsafeRow row) {
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
@@ -41,12 +50,23 @@ public class BufferHolder {
Platform.BYTE_ARRAY_OFFSET,
totalSize());
buffer = tmp;
+ if (row != null) {
+ row.pointTo(buffer, length * 2);
+ }
}
}
+ public void grow(int neededSize) {
+ grow(neededSize, null);
+ }
+
public void reset() {
cursor = Platform.BYTE_ARRAY_OFFSET;
}
+ public void resetTo(int offset) {
+ assert(offset <= buffer.length);
+ cursor = Platform.BYTE_ARRAY_OFFSET + offset;
+ }
public int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
index 048b7749d8..e227c0dec9 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
@@ -35,6 +35,7 @@ public class UnsafeRowWriter {
// The offset of the global buffer where we start to write this row.
private int startingOffset;
private int nullBitsSize;
+ private UnsafeRow row;
public void initialize(BufferHolder holder, int numFields) {
this.holder = holder;
@@ -43,7 +44,7 @@ public class UnsafeRowWriter {
// grow the global buffer to make sure it has enough space to write fixed-length data.
final int fixedSize = nullBitsSize + 8 * numFields;
- holder.grow(fixedSize);
+ holder.grow(fixedSize, row);
holder.cursor += fixedSize;
// zero-out the null bits region
@@ -52,12 +53,19 @@ public class UnsafeRowWriter {
}
}
+ public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
+ initialize(holder, numFields);
+ this.row = row;
+ }
+
private void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 3), 0L);
}
}
+ public BufferHolder holder() { return holder; }
+
public boolean isNullAt(int ordinal) {
return BitSetMethods.isSet(holder.buffer, startingOffset, ordinal);
}
@@ -90,7 +98,7 @@ public class UnsafeRowWriter {
if (remainder > 0) {
final int paddingBytes = 8 - remainder;
- holder.grow(paddingBytes);
+ holder.grow(paddingBytes, row);
for (int i = 0; i < paddingBytes; i++) {
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
@@ -153,7 +161,7 @@ public class UnsafeRowWriter {
}
} else {
// grow the global buffer before writing data.
- holder.grow(16);
+ holder.grow(16, row);
// zero-out the bytes
Platform.putLong(holder.buffer, holder.cursor, 0L);
@@ -185,7 +193,7 @@ public class UnsafeRowWriter {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
// grow the global buffer before writing data.
- holder.grow(roundedSize);
+ holder.grow(roundedSize, row);
zeroOutPaddingBytes(numBytes);
@@ -206,7 +214,7 @@ public class UnsafeRowWriter {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
// grow the global buffer before writing data.
- holder.grow(roundedSize);
+ holder.grow(roundedSize, row);
zeroOutPaddingBytes(numBytes);
@@ -222,7 +230,7 @@ public class UnsafeRowWriter {
public void write(int ordinal, CalendarInterval input) {
// grow the global buffer before writing data.
- holder.grow(16);
+ holder.grow(16, row);
// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(holder.buffer, holder.cursor, input.months);