aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/java
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-25 16:23:59 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-25 16:23:59 -0800
commitbe375fcbd200fb0e210b8edcfceb5a1bcdbba94b (patch)
tree060e087c33e27b44b30fb97f9861c97d8a5a06af /sql/catalyst/src/main/java
parent6f0f1d9e04a8db47e2f6f8fcfe9dea9de0f633da (diff)
downloadspark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.tar.gz
spark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.tar.bz2
spark-be375fcbd200fb0e210b8edcfceb5a1bcdbba94b.zip
[SPARK-12879] [SQL] improve the unsafe row writing framework
As we begin to use unsafe row writing framework(`BufferHolder` and `UnsafeRowWriter`) in more and more places(`UnsafeProjection`, `UnsafeRowParquetRecordReader`, `GenerateColumnAccessor`, etc.), we should add more doc to it and make it easier to use. This PR abstract the technique used in `UnsafeRowParquetRecordReader`: avoid unnecessary operatition as more as possible. For example, do not always point the row to the buffer at the end, we only need to update the size of row. If all fields are of primitive type, we can even save the row size updating. Then we can apply this technique to more places easily. a local benchmark shows `UnsafeProjection` is up to 1.7x faster after this PR: **old version** ``` Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- single long 2616.04 102.61 1.00 X single nullable long 3032.54 88.52 0.86 X primitive types 9121.05 29.43 0.29 X nullable primitive types 12410.60 21.63 0.21 X ``` **new version** ``` Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz unsafe projection: Avg Time(ms) Avg Rate(M/s) Relative Rate ------------------------------------------------------------------------------- single long 1533.34 175.07 1.00 X single nullable long 2306.73 116.37 0.66 X primitive types 8403.93 31.94 0.18 X nullable primitive types 12448.39 21.56 0.12 X ``` For single non-nullable long(the best case), we can have about 1.7x speed up. Even it's nullable, we can still have 1.3x speed up. For other cases, it's not such a boost as the saved operations only take a little proportion of the whole process. The benchmark code is included in this PR. Author: Wenchen Fan <wenchen@databricks.com> Closes #10809 from cloud-fan/unsafe-projection.
Diffstat (limited to 'sql/catalyst/src/main/java')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java44
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java58
2 files changed, 63 insertions, 39 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index d26b1b187c..af61e2011f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -21,24 +21,40 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.unsafe.Platform;
/**
- * A helper class to manage the row buffer when construct unsafe rows.
+ * A helper class to manage the data buffer for an unsafe row. The data buffer can grow and
+ * automatically re-point the unsafe row to it.
+ *
+ * This class can be used to build a one-pass unsafe row writing program, i.e. data will be written
+ * to the data buffer directly and no extra copy is needed. There should be only one instance of
+ * this class per writing program, so that the memory segment/data buffer can be reused. Note that
+ * for each incoming record, we should call `reset` of BufferHolder instance before write the record
+ * and reuse the data buffer.
+ *
+ * Generally we should call `UnsafeRow.setTotalSize` and pass in `BufferHolder.totalSize` to update
+ * the size of the result row, after writing a record to the buffer. However, we can skip this step
+ * if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {
public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
+ private final UnsafeRow row;
+ private final int fixedSize;
- public BufferHolder() {
- this(64);
+ public BufferHolder(UnsafeRow row) {
+ this(row, 64);
}
- public BufferHolder(int size) {
- buffer = new byte[size];
+ public BufferHolder(UnsafeRow row, int initialSize) {
+ this.fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 8 * row.numFields();
+ this.buffer = new byte[fixedSize + initialSize];
+ this.row = row;
+ this.row.pointTo(buffer, buffer.length);
}
/**
- * Grows the buffer to at least neededSize. If row is non-null, points the row to the buffer.
+ * Grows the buffer by at least neededSize and points the row to the buffer.
*/
- public void grow(int neededSize, UnsafeRow row) {
+ public void grow(int neededSize) {
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
@@ -50,22 +66,12 @@ public class BufferHolder {
Platform.BYTE_ARRAY_OFFSET,
totalSize());
buffer = tmp;
- if (row != null) {
- row.pointTo(buffer, length * 2);
- }
+ row.pointTo(buffer, buffer.length);
}
}
- public void grow(int neededSize) {
- grow(neededSize, null);
- }
-
public void reset() {
- cursor = Platform.BYTE_ARRAY_OFFSET;
- }
- public void resetTo(int offset) {
- assert(offset <= buffer.length);
- cursor = Platform.BYTE_ARRAY_OFFSET + offset;
+ cursor = Platform.BYTE_ARRAY_OFFSET + fixedSize;
}
public int totalSize() {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
index e227c0dec9..4776617043 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
@@ -26,38 +26,56 @@ import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
/**
- * A helper class to write data into global row buffer using `UnsafeRow` format,
- * used by {@link org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection}.
+ * A helper class to write data into global row buffer using `UnsafeRow` format.
+ *
+ * It will remember the offset of row buffer which it starts to write, and move the cursor of row
+ * buffer while writing. If new data(can be the input record if this is the outermost writer, or
+ * nested struct if this is an inner writer) comes, the starting cursor of row buffer may be
+ * changed, so we need to call `UnsafeRowWriter.reset` before writing, to update the
+ * `startingOffset` and clear out null bits.
+ *
+ * Note that if this is the outermost writer, which means we will always write from the very
+ * beginning of the global row buffer, we don't need to update `startingOffset` and can just call
+ * `zeroOutNullBytes` before writing new data.
*/
public class UnsafeRowWriter {
- private BufferHolder holder;
+ private final BufferHolder holder;
// The offset of the global buffer where we start to write this row.
private int startingOffset;
- private int nullBitsSize;
- private UnsafeRow row;
+ private final int nullBitsSize;
+ private final int fixedSize;
- public void initialize(BufferHolder holder, int numFields) {
+ public UnsafeRowWriter(BufferHolder holder, int numFields) {
this.holder = holder;
- this.startingOffset = holder.cursor;
this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
+ this.fixedSize = nullBitsSize + 8 * numFields;
+ this.startingOffset = holder.cursor;
+ }
+
+ /**
+ * Resets the `startingOffset` according to the current cursor of row buffer, and clear out null
+ * bits. This should be called before we write a new nested struct to the row buffer.
+ */
+ public void reset() {
+ this.startingOffset = holder.cursor;
// grow the global buffer to make sure it has enough space to write fixed-length data.
- final int fixedSize = nullBitsSize + 8 * numFields;
- holder.grow(fixedSize, row);
+ holder.grow(fixedSize);
holder.cursor += fixedSize;
- // zero-out the null bits region
+ zeroOutNullBytes();
+ }
+
+ /**
+ * Clears out null bits. This should be called before we write a new row to row buffer.
+ */
+ public void zeroOutNullBytes() {
for (int i = 0; i < nullBitsSize; i += 8) {
Platform.putLong(holder.buffer, startingOffset + i, 0L);
}
}
- public void initialize(UnsafeRow row, BufferHolder holder, int numFields) {
- initialize(holder, numFields);
- this.row = row;
- }
-
private void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 3), 0L);
@@ -98,7 +116,7 @@ public class UnsafeRowWriter {
if (remainder > 0) {
final int paddingBytes = 8 - remainder;
- holder.grow(paddingBytes, row);
+ holder.grow(paddingBytes);
for (int i = 0; i < paddingBytes; i++) {
Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
@@ -161,7 +179,7 @@ public class UnsafeRowWriter {
}
} else {
// grow the global buffer before writing data.
- holder.grow(16, row);
+ holder.grow(16);
// zero-out the bytes
Platform.putLong(holder.buffer, holder.cursor, 0L);
@@ -193,7 +211,7 @@ public class UnsafeRowWriter {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
// grow the global buffer before writing data.
- holder.grow(roundedSize, row);
+ holder.grow(roundedSize);
zeroOutPaddingBytes(numBytes);
@@ -214,7 +232,7 @@ public class UnsafeRowWriter {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
// grow the global buffer before writing data.
- holder.grow(roundedSize, row);
+ holder.grow(roundedSize);
zeroOutPaddingBytes(numBytes);
@@ -230,7 +248,7 @@ public class UnsafeRowWriter {
public void write(int ordinal, CalendarInterval input) {
// grow the global buffer before writing data.
- holder.grow(16, row);
+ holder.grow(16);
// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(holder.buffer, holder.cursor, input.months);