aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-21 19:20:31 -0700
committerReynold Xin <rxin@databricks.com>2015-10-21 19:20:31 -0700
commit1d9733271595596683a6d956a7433fa601df1cc1 (patch)
treedfe891e5f6bd28726f99dcc092d43e237ce154f3 /sql/catalyst
parent40a10d7675578f8370d07e23810d9fc5d58e0550 (diff)
downloadspark-1d9733271595596683a6d956a7433fa601df1cc1.tar.gz
spark-1d9733271595596683a6d956a7433fa601df1cc1.tar.bz2
spark-1d9733271595596683a6d956a7433fa601df1cc1.zip
[SPARK-11243][SQL] output UnsafeRow from columnar cache
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects). Author: Davies Liu <davies@databricks.com> Closes #9203 from davies/unsafe_cache.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java31
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java78
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java102
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala57
4 files changed, 160 insertions, 108 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 366615f6fe..850838af9b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -402,7 +402,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
if (isNullAt(ordinal)) return null;
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
- final int size = (int) (offsetAndSize & ((1L << 32) - 1));
+ final int size = (int) offsetAndSize;
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}
@@ -413,7 +413,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
- final int size = (int) (offsetAndSize & ((1L << 32) - 1));
+ final int size = (int) offsetAndSize;
final byte[] bytes = new byte[size];
Platform.copyMemory(
baseObject,
@@ -446,7 +446,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
- final int size = (int) (offsetAndSize & ((1L << 32) - 1));
+ final int size = (int) offsetAndSize;
final UnsafeRow row = new UnsafeRow();
row.pointTo(baseObject, baseOffset + offset, numFields, size);
return row;
@@ -460,7 +460,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
- final int size = (int) (offsetAndSize & ((1L << 32) - 1));
+ final int size = (int) offsetAndSize;
final UnsafeArrayData array = new UnsafeArrayData();
array.pointTo(baseObject, baseOffset + offset, size);
return array;
@@ -474,7 +474,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
} else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
- final int size = (int) (offsetAndSize & ((1L << 32) - 1));
+ final int size = (int) offsetAndSize;
final UnsafeMapData map = new UnsafeMapData();
map.pointTo(baseObject, baseOffset + offset, size);
return map;
@@ -618,6 +618,27 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
buffer.position(pos + sizeInBytes);
}
+ /**
+ * Write the bytes of var-length field into ByteBuffer
+ *
+ * Note: only work with HeapByteBuffer
+ */
+ public void writeFieldTo(int ordinal, ByteBuffer buffer) {
+ final long offsetAndSize = getLong(ordinal);
+ final int offset = (int) (offsetAndSize >> 32);
+ final int size = (int) offsetAndSize;
+
+ buffer.putInt(size);
+ int pos = buffer.position();
+ buffer.position(pos + size);
+ Platform.copyMemory(
+ baseObject,
+ baseOffset + offset,
+ buffer.array(),
+ Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos,
+ size);
+ }
+
@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
index 7f2a1cb07a..7dd932d198 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
@@ -17,7 +17,6 @@
package org.apache.spark.sql.catalyst.expressions.codegen;
-import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.CalendarInterval;
@@ -64,29 +63,72 @@ public class UnsafeArrayWriter {
Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset);
}
- public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) {
- // make sure Decimal object has the same scale as DecimalType
- if (input.changePrecision(precision, scale)) {
- Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong());
- setOffset(ordinal);
- holder.cursor += 8;
- } else {
- setNullAt(ordinal);
+ public void write(int ordinal, boolean value) {
+ Platform.putBoolean(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 1;
+ }
+
+ public void write(int ordinal, byte value) {
+ Platform.putByte(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 1;
+ }
+
+ public void write(int ordinal, short value) {
+ Platform.putShort(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 2;
+ }
+
+ public void write(int ordinal, int value) {
+ Platform.putInt(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 4;
+ }
+
+ public void write(int ordinal, long value) {
+ Platform.putLong(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 8;
+ }
+
+ public void write(int ordinal, float value) {
+ if (Float.isNaN(value)) {
+ value = Float.NaN;
+ }
+ Platform.putFloat(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 4;
+ }
+
+ public void write(int ordinal, double value) {
+ if (Double.isNaN(value)) {
+ value = Double.NaN;
}
+ Platform.putDouble(holder.buffer, holder.cursor, value);
+ setOffset(ordinal);
+ holder.cursor += 8;
}
public void write(int ordinal, Decimal input, int precision, int scale) {
// make sure Decimal object has the same scale as DecimalType
if (input.changePrecision(precision, scale)) {
- final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
- assert bytes.length <= 16;
- holder.grow(bytes.length);
-
- // Write the bytes to the variable length portion.
- Platform.copyMemory(
- bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
- setOffset(ordinal);
- holder.cursor += bytes.length;
+ if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong());
+ setOffset(ordinal);
+ holder.cursor += 8;
+ } else {
+ final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
+ assert bytes.length <= 16;
+ holder.grow(bytes.length);
+
+ // Write the bytes to the variable length portion.
+ Platform.copyMemory(
+ bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
+ setOffset(ordinal);
+ holder.cursor += bytes.length;
+ }
} else {
setNullAt(ordinal);
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
index e1f5a05d1d..adbe262187 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
@@ -58,6 +58,10 @@ public class UnsafeRowWriter {
}
}
+ public boolean isNullAt(int ordinal) {
+ return BitSetMethods.isSet(holder.buffer, startingOffset, ordinal);
+ }
+
public void setNullAt(int ordinal) {
BitSetMethods.set(holder.buffer, startingOffset, ordinal);
Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L);
@@ -95,41 +99,75 @@ public class UnsafeRowWriter {
}
}
- public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) {
- // make sure Decimal object has the same scale as DecimalType
- if (input.changePrecision(precision, scale)) {
- Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong());
- } else {
- setNullAt(ordinal);
- }
+ public void write(int ordinal, boolean value) {
+ Platform.putBoolean(holder.buffer, getFieldOffset(ordinal), value);
}
- public void write(int ordinal, Decimal input, int precision, int scale) {
- // grow the global buffer before writing data.
- holder.grow(16);
+ public void write(int ordinal, byte value) {
+ Platform.putByte(holder.buffer, getFieldOffset(ordinal), value);
+ }
- // zero-out the bytes
- Platform.putLong(holder.buffer, holder.cursor, 0L);
- Platform.putLong(holder.buffer, holder.cursor + 8, 0L);
+ public void write(int ordinal, short value) {
+ Platform.putShort(holder.buffer, getFieldOffset(ordinal), value);
+ }
- // Make sure Decimal object has the same scale as DecimalType.
- // Note that we may pass in null Decimal object to set null for it.
- if (input == null || !input.changePrecision(precision, scale)) {
- BitSetMethods.set(holder.buffer, startingOffset, ordinal);
- // keep the offset for future update
- setOffsetAndSize(ordinal, 0L);
- } else {
- final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
- assert bytes.length <= 16;
+ public void write(int ordinal, int value) {
+ Platform.putInt(holder.buffer, getFieldOffset(ordinal), value);
+ }
- // Write the bytes to the variable length portion.
- Platform.copyMemory(
- bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
- setOffsetAndSize(ordinal, bytes.length);
+ public void write(int ordinal, long value) {
+ Platform.putLong(holder.buffer, getFieldOffset(ordinal), value);
+ }
+
+ public void write(int ordinal, float value) {
+ if (Float.isNaN(value)) {
+ value = Float.NaN;
}
+ Platform.putFloat(holder.buffer, getFieldOffset(ordinal), value);
+ }
- // move the cursor forward.
- holder.cursor += 16;
+ public void write(int ordinal, double value) {
+ if (Double.isNaN(value)) {
+ value = Double.NaN;
+ }
+ Platform.putDouble(holder.buffer, getFieldOffset(ordinal), value);
+ }
+
+ public void write(int ordinal, Decimal input, int precision, int scale) {
+ if (precision <= Decimal.MAX_LONG_DIGITS()) {
+ // make sure Decimal object has the same scale as DecimalType
+ if (input.changePrecision(precision, scale)) {
+ Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong());
+ } else {
+ setNullAt(ordinal);
+ }
+ } else {
+ // grow the global buffer before writing data.
+ holder.grow(16);
+
+ // zero-out the bytes
+ Platform.putLong(holder.buffer, holder.cursor, 0L);
+ Platform.putLong(holder.buffer, holder.cursor + 8, 0L);
+
+ // Make sure Decimal object has the same scale as DecimalType.
+ // Note that we may pass in null Decimal object to set null for it.
+ if (input == null || !input.changePrecision(precision, scale)) {
+ BitSetMethods.set(holder.buffer, startingOffset, ordinal);
+ // keep the offset for future update
+ setOffsetAndSize(ordinal, 0L);
+ } else {
+ final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
+ assert bytes.length <= 16;
+
+ // Write the bytes to the variable length portion.
+ Platform.copyMemory(
+ bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length);
+ setOffsetAndSize(ordinal, bytes.length);
+ }
+
+ // move the cursor forward.
+ holder.cursor += 16;
+ }
}
public void write(int ordinal, UTF8String input) {
@@ -151,7 +189,10 @@ public class UnsafeRowWriter {
}
public void write(int ordinal, byte[] input) {
- final int numBytes = input.length;
+ write(ordinal, input, 0, input.length);
+ }
+
+ public void write(int ordinal, byte[] input, int offset, int numBytes) {
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
// grow the global buffer before writing data.
@@ -160,7 +201,8 @@ public class UnsafeRowWriter {
zeroOutPaddingBytes(numBytes);
// Write the bytes to the variable length portion.
- Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);
+ Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET + offset,
+ holder.buffer, holder.cursor, numBytes);
setOffsetAndSize(ordinal, numBytes);
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index dbe92d6a83..2136f82ba4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -89,7 +89,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val setNull = dt match {
case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS =>
// Can't call setNullAt() for DecimalType with precision larger than 18.
- s"$rowWriter.write($index, null, ${t.precision}, ${t.scale});"
+ s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});"
case _ => s"$rowWriter.setNullAt($index);"
}
@@ -124,17 +124,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
"""
case _ if ctx.isPrimitiveType(dt) =>
- val fieldOffset = ctx.freshName("fieldOffset")
s"""
- final long $fieldOffset = $rowWriter.getFieldOffset($index);
- Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L);
- ${writePrimitiveType(ctx, input.value, dt, s"$bufferHolder.buffer", fieldOffset)}
+ $rowWriter.write($index, ${input.value});
"""
- case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
- s"$rowWriter.writeCompactDecimal($index, ${input.value}, " +
- s"${t.precision}, ${t.scale});"
-
case t: DecimalType =>
s"$rowWriter.write($index, ${input.value}, ${t.precision}, ${t.scale});"
@@ -204,20 +197,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)}
"""
- case _ if ctx.isPrimitiveType(et) =>
- // Should we do word align?
- val dataSize = et.defaultSize
-
- s"""
- $arrayWriter.setOffset($index);
- ${writePrimitiveType(ctx, element, et,
- s"$bufferHolder.buffer", s"$bufferHolder.cursor")}
- $bufferHolder.cursor += $dataSize;
- """
-
- case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS =>
- s"$arrayWriter.writeCompactDecimal($index, $element, ${t.precision}, ${t.scale});"
-
case t: DecimalType =>
s"$arrayWriter.write($index, $element, ${t.precision}, ${t.scale});"
@@ -296,38 +275,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
"""
}
- private def writePrimitiveType(
- ctx: CodeGenContext,
- input: String,
- dt: DataType,
- buffer: String,
- offset: String) = {
- assert(ctx.isPrimitiveType(dt))
-
- val putMethod = s"put${ctx.primitiveTypeName(dt)}"
-
- dt match {
- case FloatType | DoubleType =>
- val normalized = ctx.freshName("normalized")
- val boxedType = ctx.boxedType(dt)
- val handleNaN =
- s"""
- final ${ctx.javaType(dt)} $normalized;
- if ($boxedType.isNaN($input)) {
- $normalized = $boxedType.NaN;
- } else {
- $normalized = $input;
- }
- """
-
- s"""
- $handleNaN
- Platform.$putMethod($buffer, $offset, $normalized);
- """
- case _ => s"Platform.$putMethod($buffer, $offset, $input);"
- }
- }
-
def createCode(ctx: CodeGenContext, expressions: Seq[Expression]): GeneratedExpressionCode = {
val exprEvals = expressions.map(e => e.gen(ctx))
val exprTypes = expressions.map(_.dataType)