diff options
author | Davies Liu <davies@databricks.com> | 2015-10-21 19:20:31 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-10-21 19:20:31 -0700 |
commit | 1d9733271595596683a6d956a7433fa601df1cc1 (patch) | |
tree | dfe891e5f6bd28726f99dcc092d43e237ce154f3 /sql/catalyst | |
parent | 40a10d7675578f8370d07e23810d9fc5d58e0550 (diff) | |
download | spark-1d9733271595596683a6d956a7433fa601df1cc1.tar.gz spark-1d9733271595596683a6d956a7433fa601df1cc1.tar.bz2 spark-1d9733271595596683a6d956a7433fa601df1cc1.zip |
[SPARK-11243][SQL] output UnsafeRow from columnar cache
This PR change InMemoryTableScan to output UnsafeRow, and optimize the unrolling and scanning by coping the bytes for var-length types between UnsafeRow and ByteBuffer directly without creating the wrapper objects. When scanning the decimals in TPC-DS store_sales table, it's 80% faster (copy it as long without create Decimal objects).
Author: Davies Liu <davies@databricks.com>
Closes #9203 from davies/unsafe_cache.
Diffstat (limited to 'sql/catalyst')
4 files changed, 160 insertions, 108 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 366615f6fe..850838af9b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -402,7 +402,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS if (isNullAt(ordinal)) return null; final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int size = (int) (offsetAndSize & ((1L << 32) - 1)); + final int size = (int) offsetAndSize; return UTF8String.fromAddress(baseObject, baseOffset + offset, size); } @@ -413,7 +413,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int size = (int) (offsetAndSize & ((1L << 32) - 1)); + final int size = (int) offsetAndSize; final byte[] bytes = new byte[size]; Platform.copyMemory( baseObject, @@ -446,7 +446,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int size = (int) (offsetAndSize & ((1L << 32) - 1)); + final int size = (int) offsetAndSize; final UnsafeRow row = new UnsafeRow(); row.pointTo(baseObject, baseOffset + offset, numFields, size); return row; @@ -460,7 +460,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int size = (int) (offsetAndSize & ((1L << 32) - 1)); + final int size = (int) offsetAndSize; final UnsafeArrayData array = new UnsafeArrayData(); array.pointTo(baseObject, baseOffset + offset, size); return array; @@ -474,7 +474,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS } else { final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); - final int size = (int) (offsetAndSize & ((1L << 32) - 1)); + final int size = (int) offsetAndSize; final UnsafeMapData map = new UnsafeMapData(); map.pointTo(baseObject, baseOffset + offset, size); return map; @@ -618,6 +618,27 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS buffer.position(pos + sizeInBytes); } + /** + * Write the bytes of var-length field into ByteBuffer + * + * Note: only work with HeapByteBuffer + */ + public void writeFieldTo(int ordinal, ByteBuffer buffer) { + final long offsetAndSize = getLong(ordinal); + final int offset = (int) (offsetAndSize >> 32); + final int size = (int) offsetAndSize; + + buffer.putInt(size); + int pos = buffer.position(); + buffer.position(pos + size); + Platform.copyMemory( + baseObject, + baseOffset + offset, + buffer.array(), + Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos, + size); + } + @Override public void writeExternal(ObjectOutput out) throws IOException { byte[] bytes = getBytes(); diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java index 7f2a1cb07a..7dd932d198 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen; -import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.Decimal; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; @@ -64,29 +63,72 @@ public class UnsafeArrayWriter { Platform.putInt(holder.buffer, getElementOffset(ordinal), relativeOffset); } - public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) { - // make sure Decimal object has the same scale as DecimalType - if (input.changePrecision(precision, scale)) { - Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong()); - setOffset(ordinal); - holder.cursor += 8; - } else { - setNullAt(ordinal); + public void write(int ordinal, boolean value) { + Platform.putBoolean(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 1; + } + + public void write(int ordinal, byte value) { + Platform.putByte(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 1; + } + + public void write(int ordinal, short value) { + Platform.putShort(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 2; + } + + public void write(int ordinal, int value) { + Platform.putInt(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 4; + } + + public void write(int ordinal, long value) { + Platform.putLong(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 8; + } + + public void write(int ordinal, float value) { + if (Float.isNaN(value)) { + value = Float.NaN; + } + Platform.putFloat(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 4; + } + + public void write(int ordinal, double value) { + if (Double.isNaN(value)) { + value = Double.NaN; } + Platform.putDouble(holder.buffer, holder.cursor, value); + setOffset(ordinal); + holder.cursor += 8; } public void write(int ordinal, Decimal input, int precision, int scale) { // make sure Decimal object has the same scale as DecimalType if (input.changePrecision(precision, scale)) { - final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray(); - assert bytes.length <= 16; - holder.grow(bytes.length); - - // Write the bytes to the variable length portion. - Platform.copyMemory( - bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length); - setOffset(ordinal); - holder.cursor += bytes.length; + if (precision <= Decimal.MAX_LONG_DIGITS()) { + Platform.putLong(holder.buffer, holder.cursor, input.toUnscaledLong()); + setOffset(ordinal); + holder.cursor += 8; + } else { + final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray(); + assert bytes.length <= 16; + holder.grow(bytes.length); + + // Write the bytes to the variable length portion. + Platform.copyMemory( + bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length); + setOffset(ordinal); + holder.cursor += bytes.length; + } } else { setNullAt(ordinal); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java index e1f5a05d1d..adbe262187 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java @@ -58,6 +58,10 @@ public class UnsafeRowWriter { } } + public boolean isNullAt(int ordinal) { + return BitSetMethods.isSet(holder.buffer, startingOffset, ordinal); + } + public void setNullAt(int ordinal) { BitSetMethods.set(holder.buffer, startingOffset, ordinal); Platform.putLong(holder.buffer, getFieldOffset(ordinal), 0L); @@ -95,41 +99,75 @@ public class UnsafeRowWriter { } } - public void writeCompactDecimal(int ordinal, Decimal input, int precision, int scale) { - // make sure Decimal object has the same scale as DecimalType - if (input.changePrecision(precision, scale)) { - Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong()); - } else { - setNullAt(ordinal); - } + public void write(int ordinal, boolean value) { + Platform.putBoolean(holder.buffer, getFieldOffset(ordinal), value); } - public void write(int ordinal, Decimal input, int precision, int scale) { - // grow the global buffer before writing data. - holder.grow(16); + public void write(int ordinal, byte value) { + Platform.putByte(holder.buffer, getFieldOffset(ordinal), value); + } - // zero-out the bytes - Platform.putLong(holder.buffer, holder.cursor, 0L); - Platform.putLong(holder.buffer, holder.cursor + 8, 0L); + public void write(int ordinal, short value) { + Platform.putShort(holder.buffer, getFieldOffset(ordinal), value); + } - // Make sure Decimal object has the same scale as DecimalType. - // Note that we may pass in null Decimal object to set null for it. - if (input == null || !input.changePrecision(precision, scale)) { - BitSetMethods.set(holder.buffer, startingOffset, ordinal); - // keep the offset for future update - setOffsetAndSize(ordinal, 0L); - } else { - final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray(); - assert bytes.length <= 16; + public void write(int ordinal, int value) { + Platform.putInt(holder.buffer, getFieldOffset(ordinal), value); + } - // Write the bytes to the variable length portion. - Platform.copyMemory( - bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length); - setOffsetAndSize(ordinal, bytes.length); + public void write(int ordinal, long value) { + Platform.putLong(holder.buffer, getFieldOffset(ordinal), value); + } + + public void write(int ordinal, float value) { + if (Float.isNaN(value)) { + value = Float.NaN; } + Platform.putFloat(holder.buffer, getFieldOffset(ordinal), value); + } - // move the cursor forward. - holder.cursor += 16; + public void write(int ordinal, double value) { + if (Double.isNaN(value)) { + value = Double.NaN; + } + Platform.putDouble(holder.buffer, getFieldOffset(ordinal), value); + } + + public void write(int ordinal, Decimal input, int precision, int scale) { + if (precision <= Decimal.MAX_LONG_DIGITS()) { + // make sure Decimal object has the same scale as DecimalType + if (input.changePrecision(precision, scale)) { + Platform.putLong(holder.buffer, getFieldOffset(ordinal), input.toUnscaledLong()); + } else { + setNullAt(ordinal); + } + } else { + // grow the global buffer before writing data. + holder.grow(16); + + // zero-out the bytes + Platform.putLong(holder.buffer, holder.cursor, 0L); + Platform.putLong(holder.buffer, holder.cursor + 8, 0L); + + // Make sure Decimal object has the same scale as DecimalType. + // Note that we may pass in null Decimal object to set null for it. + if (input == null || !input.changePrecision(precision, scale)) { + BitSetMethods.set(holder.buffer, startingOffset, ordinal); + // keep the offset for future update + setOffsetAndSize(ordinal, 0L); + } else { + final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray(); + assert bytes.length <= 16; + + // Write the bytes to the variable length portion. + Platform.copyMemory( + bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, bytes.length); + setOffsetAndSize(ordinal, bytes.length); + } + + // move the cursor forward. + holder.cursor += 16; + } } public void write(int ordinal, UTF8String input) { @@ -151,7 +189,10 @@ public class UnsafeRowWriter { } public void write(int ordinal, byte[] input) { - final int numBytes = input.length; + write(ordinal, input, 0, input.length); + } + + public void write(int ordinal, byte[] input, int offset, int numBytes) { final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); // grow the global buffer before writing data. @@ -160,7 +201,8 @@ public class UnsafeRowWriter { zeroOutPaddingBytes(numBytes); // Write the bytes to the variable length portion. - Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes); + Platform.copyMemory(input, Platform.BYTE_ARRAY_OFFSET + offset, + holder.buffer, holder.cursor, numBytes); setOffsetAndSize(ordinal, numBytes); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index dbe92d6a83..2136f82ba4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -89,7 +89,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val setNull = dt match { case t: DecimalType if t.precision > Decimal.MAX_LONG_DIGITS => // Can't call setNullAt() for DecimalType with precision larger than 18. - s"$rowWriter.write($index, null, ${t.precision}, ${t.scale});" + s"$rowWriter.write($index, (Decimal) null, ${t.precision}, ${t.scale});" case _ => s"$rowWriter.setNullAt($index);" } @@ -124,17 +124,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro """ case _ if ctx.isPrimitiveType(dt) => - val fieldOffset = ctx.freshName("fieldOffset") s""" - final long $fieldOffset = $rowWriter.getFieldOffset($index); - Platform.putLong($bufferHolder.buffer, $fieldOffset, 0L); - ${writePrimitiveType(ctx, input.value, dt, s"$bufferHolder.buffer", fieldOffset)} + $rowWriter.write($index, ${input.value}); """ - case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => - s"$rowWriter.writeCompactDecimal($index, ${input.value}, " + - s"${t.precision}, ${t.scale});" - case t: DecimalType => s"$rowWriter.write($index, ${input.value}, ${t.precision}, ${t.scale});" @@ -204,20 +197,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro ${writeMapToBuffer(ctx, element, kt, vt, bufferHolder)} """ - case _ if ctx.isPrimitiveType(et) => - // Should we do word align? - val dataSize = et.defaultSize - - s""" - $arrayWriter.setOffset($index); - ${writePrimitiveType(ctx, element, et, - s"$bufferHolder.buffer", s"$bufferHolder.cursor")} - $bufferHolder.cursor += $dataSize; - """ - - case t: DecimalType if t.precision <= Decimal.MAX_LONG_DIGITS => - s"$arrayWriter.writeCompactDecimal($index, $element, ${t.precision}, ${t.scale});" - case t: DecimalType => s"$arrayWriter.write($index, $element, ${t.precision}, ${t.scale});" @@ -296,38 +275,6 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro """ } - private def writePrimitiveType( - ctx: CodeGenContext, - input: String, - dt: DataType, - buffer: String, - offset: String) = { - assert(ctx.isPrimitiveType(dt)) - - val putMethod = s"put${ctx.primitiveTypeName(dt)}" - - dt match { - case FloatType | DoubleType => - val normalized = ctx.freshName("normalized") - val boxedType = ctx.boxedType(dt) - val handleNaN = - s""" - final ${ctx.javaType(dt)} $normalized; - if ($boxedType.isNaN($input)) { - $normalized = $boxedType.NaN; - } else { - $normalized = $input; - } - """ - - s""" - $handleNaN - Platform.$putMethod($buffer, $offset, $normalized); - """ - case _ => s"Platform.$putMethod($buffer, $offset, $input);" - } - } - def createCode(ctx: CodeGenContext, expressions: Seq[Expression]): GeneratedExpressionCode = { val exprEvals = expressions.map(e => e.gen(ctx)) val exprTypes = expressions.map(_.dataType) |