diff options
author | Davies Liu <davies@databricks.com> | 2015-12-30 22:16:37 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-30 22:16:37 -0800 |
commit | e6c77874b915691dead91e8d96ad9f58ba3a73db (patch) | |
tree | 0e7bffd6e78f115f22f8168d2ddea54aa9b50d12 /sql/catalyst | |
parent | 93b52abca708aba90b6d0d36c92b920f6f8338ad (diff) | |
download | spark-e6c77874b915691dead91e8d96ad9f58ba3a73db.tar.gz spark-e6c77874b915691dead91e8d96ad9f58ba3a73db.tar.bz2 spark-e6c77874b915691dead91e8d96ad9f58ba3a73db.zip |
[SPARK-12585] [SQL] move numFields to constructor of UnsafeRow
Right now, numFields will be passed in by pointTo(), then bitSetWidthInBytes is calculated, making pointTo() a little bit heavy.
It should be part of constructor of UnsafeRow.
Author: Davies Liu <davies@databricks.com>
Closes #10528 from davies/numFields.
Diffstat (limited to 'sql/catalyst')
6 files changed, 41 insertions, 79 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java index 3513960b41..3d80df2271 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java @@ -270,8 +270,8 @@ public class UnsafeArrayData extends ArrayData { final int offset = getElementOffset(ordinal); if (offset < 0) return null; final int size = getElementSize(offset, ordinal); - final UnsafeRow row = new UnsafeRow(); - row.pointTo(baseObject, baseOffset + offset, numFields, size); + final UnsafeRow row = new UnsafeRow(numFields); + row.pointTo(baseObject, baseOffset + offset, size); return row; } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index b6979d0c82..7492b88c47 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -17,11 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.io.OutputStream; +import java.io.*; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -30,26 +26,12 @@ import java.util.Collections; import java.util.HashSet; import java.util.Set; -import org.apache.spark.sql.types.ArrayType; -import org.apache.spark.sql.types.BinaryType; -import org.apache.spark.sql.types.BooleanType; -import org.apache.spark.sql.types.ByteType; -import org.apache.spark.sql.types.CalendarIntervalType; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DateType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.DecimalType; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.FloatType; -import org.apache.spark.sql.types.IntegerType; -import org.apache.spark.sql.types.LongType; -import org.apache.spark.sql.types.MapType; -import org.apache.spark.sql.types.NullType; -import org.apache.spark.sql.types.ShortType; -import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.types.TimestampType; -import org.apache.spark.sql.types.UserDefinedType; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoSerializable; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.array.ByteArrayMethods; import org.apache.spark.unsafe.bitset.BitSetMethods; @@ -57,23 +39,9 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; -import static org.apache.spark.sql.types.DataTypes.BooleanType; -import static org.apache.spark.sql.types.DataTypes.ByteType; -import static org.apache.spark.sql.types.DataTypes.DateType; -import static org.apache.spark.sql.types.DataTypes.DoubleType; -import static org.apache.spark.sql.types.DataTypes.FloatType; -import static org.apache.spark.sql.types.DataTypes.IntegerType; -import static org.apache.spark.sql.types.DataTypes.LongType; -import static org.apache.spark.sql.types.DataTypes.NullType; -import static org.apache.spark.sql.types.DataTypes.ShortType; -import static org.apache.spark.sql.types.DataTypes.TimestampType; +import static org.apache.spark.sql.types.DataTypes.*; import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.KryoSerializable; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; - /** * An Unsafe implementation of Row which is backed by raw memory instead of Java objects. * @@ -167,8 +135,16 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS /** * Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called, * since the value returned by this constructor is equivalent to a null pointer. + * + * @param numFields the number of fields in this row */ - public UnsafeRow() { } + public UnsafeRow(int numFields) { + this.numFields = numFields; + this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); + } + + // for serializer + public UnsafeRow() {} public Object getBaseObject() { return baseObject; } public long getBaseOffset() { return baseOffset; } @@ -182,15 +158,12 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS * * @param baseObject the base object * @param baseOffset the offset within the base object - * @param numFields the number of fields in this row * @param sizeInBytes the size of this row's backing data, in bytes */ - public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) { + public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; - this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields); this.baseObject = baseObject; this.baseOffset = baseOffset; - this.numFields = numFields; this.sizeInBytes = sizeInBytes; } @@ -198,23 +171,12 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS * Update this UnsafeRow to point to the underlying byte array. * * @param buf byte array to point to - * @param numFields the number of fields in this row - * @param sizeInBytes the number of bytes valid in the byte array - */ - public void pointTo(byte[] buf, int numFields, int sizeInBytes) { - pointTo(buf, Platform.BYTE_ARRAY_OFFSET, numFields, sizeInBytes); - } - - /** - * Updates this UnsafeRow preserving the number of fields. - * @param buf byte array to point to * @param sizeInBytes the number of bytes valid in the byte array */ public void pointTo(byte[] buf, int sizeInBytes) { - pointTo(buf, numFields, sizeInBytes); + pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); } - public void setNotNullAt(int i) { assertIndexIsValid(i); BitSetMethods.unset(baseObject, baseOffset, i); @@ -489,8 +451,8 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS final long offsetAndSize = getLong(ordinal); final int offset = (int) (offsetAndSize >> 32); final int size = (int) offsetAndSize; - final UnsafeRow row = new UnsafeRow(); - row.pointTo(baseObject, baseOffset + offset, numFields, size); + final UnsafeRow row = new UnsafeRow(numFields); + row.pointTo(baseObject, baseOffset + offset, size); return row; } } @@ -529,7 +491,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS */ @Override public UnsafeRow copy() { - UnsafeRow rowCopy = new UnsafeRow(); + UnsafeRow rowCopy = new UnsafeRow(numFields); final byte[] rowDataCopy = new byte[sizeInBytes]; Platform.copyMemory( baseObject, @@ -538,7 +500,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS Platform.BYTE_ARRAY_OFFSET, sizeInBytes ); - rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, numFields, sizeInBytes); + rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); return rowCopy; } @@ -547,8 +509,8 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS * The returned row is invalid until we call copyFrom on it. */ public static UnsafeRow createFromByteArray(int numBytes, int numFields) { - final UnsafeRow row = new UnsafeRow(); - row.pointTo(new byte[numBytes], numFields, numBytes); + final UnsafeRow row = new UnsafeRow(numFields); + row.pointTo(new byte[numBytes], numBytes); return row; } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 352002b349..27ae62f121 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -26,10 +26,9 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; -import org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator; import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.collection.unsafe.sort.PrefixComparator; @@ -123,7 +122,7 @@ final class UnsafeExternalRowSorter { return new AbstractScalaRowIterator<UnsafeRow>() { private final int numFields = schema.length(); - private UnsafeRow row = new UnsafeRow(); + private UnsafeRow row = new UnsafeRow(numFields); @Override public boolean hasNext() { @@ -137,7 +136,6 @@ final class UnsafeExternalRowSorter { row.pointTo( sortedIterator.getBaseObject(), sortedIterator.getBaseOffset(), - numFields, sortedIterator.getRecordLength()); if (!hasNext()) { UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page @@ -173,19 +171,21 @@ final class UnsafeExternalRowSorter { private static final class RowComparator extends RecordComparator { private final Ordering<InternalRow> ordering; private final int numFields; - private final UnsafeRow row1 = new UnsafeRow(); - private final UnsafeRow row2 = new UnsafeRow(); + private final UnsafeRow row1; + private final UnsafeRow row2; public RowComparator(Ordering<InternalRow> ordering, int numFields) { this.numFields = numFields; + this.row1 = new UnsafeRow(numFields); + this.row2 = new UnsafeRow(numFields); this.ordering = ordering; } @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { // TODO: Why are the sizes -1? - row1.pointTo(baseObj1, baseOff1, numFields, -1); - row2.pointTo(baseObj2, baseOff2, numFields, -1); + row1.pointTo(baseObj1, baseOff1, -1); + row2.pointTo(baseObj2, baseOff2, -1); return ordering.compare(row1, row2); } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index c1defe12b0..d0e031f279 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -289,7 +289,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val exprTypes = expressions.map(_.dataType) val result = ctx.freshName("result") - ctx.addMutableState("UnsafeRow", result, s"this.$result = new UnsafeRow();") + ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});") val bufferHolder = ctx.freshName("bufferHolder") val holderClass = classOf[BufferHolder].getName ctx.addMutableState(holderClass, bufferHolder, s"this.$bufferHolder = new $holderClass();") @@ -303,7 +303,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro $subexprReset ${writeExpressionsToBuffer(ctx, ctx.INPUT_ROW, exprEvals, exprTypes, bufferHolder)} - $result.pointTo($bufferHolder.buffer, ${expressions.length}, $bufferHolder.totalSize()); + $result.pointTo($bufferHolder.buffer, $bufferHolder.totalSize()); """ GeneratedExpressionCode(code, "false", result) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala index da602d9b4b..c9ff357bf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala @@ -165,7 +165,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U | |class SpecificUnsafeRowJoiner extends ${classOf[UnsafeRowJoiner].getName} { | private byte[] buf = new byte[64]; - | private UnsafeRow out = new UnsafeRow(); + | private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size}); | | public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) { | // row1: ${schema1.size} fields, $bitset1Words words in bitset @@ -188,7 +188,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U | $copyVariableLengthRow2 | $updateOffset | - | out.pointTo(buf, ${schema1.size + schema2.size}, sizeInBytes - $sizeReduction); + | out.pointTo(buf, sizeInBytes - $sizeReduction); | | return out; | } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala index 796d60032e..f8342214d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala @@ -90,13 +90,13 @@ class GenerateUnsafeRowJoinerBitsetSuite extends SparkFunSuite { } private def createUnsafeRow(numFields: Int): UnsafeRow = { - val row = new UnsafeRow + val row = new UnsafeRow(numFields) val sizeInBytes = numFields * 8 + ((numFields + 63) / 64) * 8 // Allocate a larger buffer than needed and point the UnsafeRow to somewhere in the middle. // This way we can test the joiner when the input UnsafeRows are not the entire arrays. val offset = numFields * 8 val buf = new Array[Byte](sizeInBytes + offset) - row.pointTo(buf, Platform.BYTE_ARRAY_OFFSET + offset, numFields, sizeInBytes) + row.pointTo(buf, Platform.BYTE_ARRAY_OFFSET + offset, sizeInBytes) row } |