aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-12-30 22:16:37 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-30 22:16:37 -0800
commite6c77874b915691dead91e8d96ad9f58ba3a73db (patch)
tree0e7bffd6e78f115f22f8168d2ddea54aa9b50d12 /sql/catalyst
parent93b52abca708aba90b6d0d36c92b920f6f8338ad (diff)
downloadspark-e6c77874b915691dead91e8d96ad9f58ba3a73db.tar.gz
spark-e6c77874b915691dead91e8d96ad9f58ba3a73db.tar.bz2
spark-e6c77874b915691dead91e8d96ad9f58ba3a73db.zip
[SPARK-12585] [SQL] move numFields to constructor of UnsafeRow
Right now, numFields will be passed in by pointTo(), then bitSetWidthInBytes is calculated, making pointTo() a little bit heavy. It should be part of constructor of UnsafeRow. Author: Davies Liu <davies@databricks.com> Closes #10528 from davies/numFields.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java4
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java88
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java16
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala4
6 files changed, 41 insertions, 79 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
index 3513960b41..3d80df2271 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeArrayData.java
@@ -270,8 +270,8 @@ public class UnsafeArrayData extends ArrayData {
final int offset = getElementOffset(ordinal);
if (offset < 0) return null;
final int size = getElementSize(offset, ordinal);
- final UnsafeRow row = new UnsafeRow();
- row.pointTo(baseObject, baseOffset + offset, numFields, size);
+ final UnsafeRow row = new UnsafeRow(numFields);
+ row.pointTo(baseObject, baseOffset + offset, size);
return row;
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index b6979d0c82..7492b88c47 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -17,11 +17,7 @@
package org.apache.spark.sql.catalyst.expressions;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.OutputStream;
+import java.io.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -30,26 +26,12 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import org.apache.spark.sql.types.ArrayType;
-import org.apache.spark.sql.types.BinaryType;
-import org.apache.spark.sql.types.BooleanType;
-import org.apache.spark.sql.types.ByteType;
-import org.apache.spark.sql.types.CalendarIntervalType;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DateType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.DecimalType;
-import org.apache.spark.sql.types.DoubleType;
-import org.apache.spark.sql.types.FloatType;
-import org.apache.spark.sql.types.IntegerType;
-import org.apache.spark.sql.types.LongType;
-import org.apache.spark.sql.types.MapType;
-import org.apache.spark.sql.types.NullType;
-import org.apache.spark.sql.types.ShortType;
-import org.apache.spark.sql.types.StringType;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.types.TimestampType;
-import org.apache.spark.sql.types.UserDefinedType;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
@@ -57,23 +39,9 @@ import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
-import static org.apache.spark.sql.types.DataTypes.BooleanType;
-import static org.apache.spark.sql.types.DataTypes.ByteType;
-import static org.apache.spark.sql.types.DataTypes.DateType;
-import static org.apache.spark.sql.types.DataTypes.DoubleType;
-import static org.apache.spark.sql.types.DataTypes.FloatType;
-import static org.apache.spark.sql.types.DataTypes.IntegerType;
-import static org.apache.spark.sql.types.DataTypes.LongType;
-import static org.apache.spark.sql.types.DataTypes.NullType;
-import static org.apache.spark.sql.types.DataTypes.ShortType;
-import static org.apache.spark.sql.types.DataTypes.TimestampType;
+import static org.apache.spark.sql.types.DataTypes.*;
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
*
@@ -167,8 +135,16 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
/**
* Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called,
* since the value returned by this constructor is equivalent to a null pointer.
+ *
+ * @param numFields the number of fields in this row
*/
- public UnsafeRow() { }
+ public UnsafeRow(int numFields) {
+ this.numFields = numFields;
+ this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
+ }
+
+ // for serializer
+ public UnsafeRow() {}
public Object getBaseObject() { return baseObject; }
public long getBaseOffset() { return baseOffset; }
@@ -182,15 +158,12 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
*
* @param baseObject the base object
* @param baseOffset the offset within the base object
- * @param numFields the number of fields in this row
* @param sizeInBytes the size of this row's backing data, in bytes
*/
- public void pointTo(Object baseObject, long baseOffset, int numFields, int sizeInBytes) {
+ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
- this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = baseObject;
this.baseOffset = baseOffset;
- this.numFields = numFields;
this.sizeInBytes = sizeInBytes;
}
@@ -198,23 +171,12 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
* Update this UnsafeRow to point to the underlying byte array.
*
* @param buf byte array to point to
- * @param numFields the number of fields in this row
- * @param sizeInBytes the number of bytes valid in the byte array
- */
- public void pointTo(byte[] buf, int numFields, int sizeInBytes) {
- pointTo(buf, Platform.BYTE_ARRAY_OFFSET, numFields, sizeInBytes);
- }
-
- /**
- * Updates this UnsafeRow preserving the number of fields.
- * @param buf byte array to point to
* @param sizeInBytes the number of bytes valid in the byte array
*/
public void pointTo(byte[] buf, int sizeInBytes) {
- pointTo(buf, numFields, sizeInBytes);
+ pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
}
-
public void setNotNullAt(int i) {
assertIndexIsValid(i);
BitSetMethods.unset(baseObject, baseOffset, i);
@@ -489,8 +451,8 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
- final UnsafeRow row = new UnsafeRow();
- row.pointTo(baseObject, baseOffset + offset, numFields, size);
+ final UnsafeRow row = new UnsafeRow(numFields);
+ row.pointTo(baseObject, baseOffset + offset, size);
return row;
}
}
@@ -529,7 +491,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
*/
@Override
public UnsafeRow copy() {
- UnsafeRow rowCopy = new UnsafeRow();
+ UnsafeRow rowCopy = new UnsafeRow(numFields);
final byte[] rowDataCopy = new byte[sizeInBytes];
Platform.copyMemory(
baseObject,
@@ -538,7 +500,7 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
Platform.BYTE_ARRAY_OFFSET,
sizeInBytes
);
- rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, numFields, sizeInBytes);
+ rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
return rowCopy;
}
@@ -547,8 +509,8 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
* The returned row is invalid until we call copyFrom on it.
*/
public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
- final UnsafeRow row = new UnsafeRow();
- row.pointTo(new byte[numBytes], numFields, numBytes);
+ final UnsafeRow row = new UnsafeRow(numFields);
+ row.pointTo(new byte[numBytes], numBytes);
return row;
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 352002b349..27ae62f121 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -26,10 +26,9 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
-import org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.catalyst.util.AbstractScalaRowIterator;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
@@ -123,7 +122,7 @@ final class UnsafeExternalRowSorter {
return new AbstractScalaRowIterator<UnsafeRow>() {
private final int numFields = schema.length();
- private UnsafeRow row = new UnsafeRow();
+ private UnsafeRow row = new UnsafeRow(numFields);
@Override
public boolean hasNext() {
@@ -137,7 +136,6 @@ final class UnsafeExternalRowSorter {
row.pointTo(
sortedIterator.getBaseObject(),
sortedIterator.getBaseOffset(),
- numFields,
sortedIterator.getRecordLength());
if (!hasNext()) {
UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page
@@ -173,19 +171,21 @@ final class UnsafeExternalRowSorter {
private static final class RowComparator extends RecordComparator {
private final Ordering<InternalRow> ordering;
private final int numFields;
- private final UnsafeRow row1 = new UnsafeRow();
- private final UnsafeRow row2 = new UnsafeRow();
+ private final UnsafeRow row1;
+ private final UnsafeRow row2;
public RowComparator(Ordering<InternalRow> ordering, int numFields) {
this.numFields = numFields;
+ this.row1 = new UnsafeRow(numFields);
+ this.row2 = new UnsafeRow(numFields);
this.ordering = ordering;
}
@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) {
// TODO: Why are the sizes -1?
- row1.pointTo(baseObj1, baseOff1, numFields, -1);
- row2.pointTo(baseObj2, baseOff2, numFields, -1);
+ row1.pointTo(baseObj1, baseOff1, -1);
+ row2.pointTo(baseObj2, baseOff2, -1);
return ordering.compare(row1, row2);
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index c1defe12b0..d0e031f279 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -289,7 +289,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val exprTypes = expressions.map(_.dataType)
val result = ctx.freshName("result")
- ctx.addMutableState("UnsafeRow", result, s"this.$result = new UnsafeRow();")
+ ctx.addMutableState("UnsafeRow", result, s"$result = new UnsafeRow(${expressions.length});")
val bufferHolder = ctx.freshName("bufferHolder")
val holderClass = classOf[BufferHolder].getName
ctx.addMutableState(holderClass, bufferHolder, s"this.$bufferHolder = new $holderClass();")
@@ -303,7 +303,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
$subexprReset
${writeExpressionsToBuffer(ctx, ctx.INPUT_ROW, exprEvals, exprTypes, bufferHolder)}
- $result.pointTo($bufferHolder.buffer, ${expressions.length}, $bufferHolder.totalSize());
+ $result.pointTo($bufferHolder.buffer, $bufferHolder.totalSize());
"""
GeneratedExpressionCode(code, "false", result)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
index da602d9b4b..c9ff357bf3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
@@ -165,7 +165,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
|
|class SpecificUnsafeRowJoiner extends ${classOf[UnsafeRowJoiner].getName} {
| private byte[] buf = new byte[64];
- | private UnsafeRow out = new UnsafeRow();
+ | private UnsafeRow out = new UnsafeRow(${schema1.size + schema2.size});
|
| public UnsafeRow join(UnsafeRow row1, UnsafeRow row2) {
| // row1: ${schema1.size} fields, $bitset1Words words in bitset
@@ -188,7 +188,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
| $copyVariableLengthRow2
| $updateOffset
|
- | out.pointTo(buf, ${schema1.size + schema2.size}, sizeInBytes - $sizeReduction);
+ | out.pointTo(buf, sizeInBytes - $sizeReduction);
|
| return out;
| }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala
index 796d60032e..f8342214d9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala
@@ -90,13 +90,13 @@ class GenerateUnsafeRowJoinerBitsetSuite extends SparkFunSuite {
}
private def createUnsafeRow(numFields: Int): UnsafeRow = {
- val row = new UnsafeRow
+ val row = new UnsafeRow(numFields)
val sizeInBytes = numFields * 8 + ((numFields + 63) / 64) * 8
// Allocate a larger buffer than needed and point the UnsafeRow to somewhere in the middle.
// This way we can test the joiner when the input UnsafeRows are not the entire arrays.
val offset = numFields * 8
val buf = new Array[Byte](sizeInBytes + offset)
- row.pointTo(buf, Platform.BYTE_ARRAY_OFFSET + offset, numFields, sizeInBytes)
+ row.pointTo(buf, Platform.BYTE_ARRAY_OFFSET + offset, sizeInBytes)
row
}