aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java7
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala3
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala94
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala5
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java630
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java126
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java70
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java157
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java169
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala78
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala397
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala3
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/Platform.java11
16 files changed, 1671 insertions, 90 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 1a351933a3..a88bcbfdb7 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -68,6 +68,10 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
return ((numFields + 63)/ 64) * 8;
}
+ public static int calculateFixedPortionByteSize(int numFields) {
+ return 8 * numFields + calculateBitSetWidthInBytes(numFields);
+ }
+
/**
* Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
*/
@@ -596,10 +600,9 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
public String toString() {
StringBuilder build = new StringBuilder("[");
for (int i = 0; i < sizeInBytes; i += 8) {
+ if (i != 0) build.append(',');
build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
- build.append(',');
}
- build.deleteCharAt(build.length() - 1);
build.append(']');
return build.toString();
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 475cbe005a..4615c55d67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
/**
* A parent class for mutable container objects that are reused when the values are changed,
@@ -212,6 +211,8 @@ final class SpecificMutableRow(val values: Array[MutableValue])
def this() = this(Seq.empty)
+ def this(schema: StructType) = this(schema.fields.map(_.dataType))
+
override def numFields: Int = values.length
override def setNullAt(i: Int): Unit = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index 7614f055e9..55efea80d1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -21,6 +21,7 @@ import java.lang.Double.longBitsToDouble
import java.lang.Float.intBitsToFloat
import java.math.MathContext
+import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql.catalyst.CatalystTypeConverters
@@ -74,14 +75,48 @@ object RandomDataGenerator {
* @param numFields the number of fields in this schema
* @param acceptedTypes types to draw from.
*/
- def randomSchema(numFields: Int, acceptedTypes: Seq[DataType]): StructType = {
+ def randomSchema(rand: Random, numFields: Int, acceptedTypes: Seq[DataType]): StructType = {
StructType(Seq.tabulate(numFields) { i =>
- val dt = acceptedTypes(Random.nextInt(acceptedTypes.size))
- StructField("col_" + i, dt, nullable = true)
+ val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+ StructField("col_" + i, dt, nullable = rand.nextBoolean())
})
}
/**
+ * Returns a random nested schema. This will randomly generate structs and arrays drawn from
+ * acceptedTypes.
+ */
+ def randomNestedSchema(rand: Random, totalFields: Int, acceptedTypes: Seq[DataType]):
+ StructType = {
+ val fields = mutable.ArrayBuffer.empty[StructField]
+ var i = 0
+ var numFields = totalFields
+ while (numFields > 0) {
+ val v = rand.nextInt(3)
+ if (v == 0) {
+ // Simple type:
+ val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+ fields += new StructField("col_" + i, dt, rand.nextBoolean())
+ numFields -= 1
+ } else if (v == 1) {
+ // Array
+ val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+ fields += new StructField("col_" + i, ArrayType(dt), rand.nextBoolean())
+ numFields -= 1
+ } else {
+ // Struct
+ // TODO: do empty structs make sense?
+ val n = Math.max(rand.nextInt(numFields), 1)
+ val nested = randomNestedSchema(rand, n, acceptedTypes)
+ fields += new StructField("col_" + i, nested, rand.nextBoolean())
+ numFields -= n
+ }
+ i += 1
+ }
+ StructType(fields)
+ }
+
+ /**
* Returns a function which generates random values for the given [[DataType]], or `None` if no
* random data generator is defined for that data type. The generated values will use an external
* representation of the data type; for example, the random generator for [[DateType]] will return
@@ -90,16 +125,13 @@ object RandomDataGenerator {
*
* @param dataType the type to generate values for
* @param nullable whether null values should be generated
- * @param seed an optional seed for the random number generator
+ * @param rand an optional random number generator
* @return a function which can be called to generate random values.
*/
def forType(
dataType: DataType,
nullable: Boolean = true,
- seed: Option[Long] = None): Option[() => Any] = {
- val rand = new Random()
- seed.foreach(rand.setSeed)
-
+ rand: Random = new Random): Option[() => Any] = {
val valueGenerator: Option[() => Any] = dataType match {
case StringType => Some(() => rand.nextString(rand.nextInt(MAX_STR_LEN)))
case BinaryType => Some(() => {
@@ -165,15 +197,15 @@ object RandomDataGenerator {
rand, _.nextInt().toShort, Seq(Short.MinValue, Short.MaxValue, 0.toShort))
case NullType => Some(() => null)
case ArrayType(elementType, containsNull) => {
- forType(elementType, nullable = containsNull, seed = Some(rand.nextLong())).map {
+ forType(elementType, nullable = containsNull, rand).map {
elementGenerator => () => Seq.fill(rand.nextInt(MAX_ARR_SIZE))(elementGenerator())
}
}
case MapType(keyType, valueType, valueContainsNull) => {
for (
- keyGenerator <- forType(keyType, nullable = false, seed = Some(rand.nextLong()));
+ keyGenerator <- forType(keyType, nullable = false, rand);
valueGenerator <-
- forType(valueType, nullable = valueContainsNull, seed = Some(rand.nextLong()))
+ forType(valueType, nullable = valueContainsNull, rand)
) yield {
() => {
Seq.fill(rand.nextInt(MAX_MAP_SIZE))((keyGenerator(), valueGenerator())).toMap
@@ -182,7 +214,7 @@ object RandomDataGenerator {
}
case StructType(fields) => {
val maybeFieldGenerators: Seq[Option[() => Any]] = fields.map { field =>
- forType(field.dataType, nullable = field.nullable, seed = Some(rand.nextLong()))
+ forType(field.dataType, nullable = field.nullable, rand)
}
if (maybeFieldGenerators.forall(_.isDefined)) {
val fieldGenerators: Seq[() => Any] = maybeFieldGenerators.map(_.get)
@@ -192,7 +224,7 @@ object RandomDataGenerator {
}
}
case udt: UserDefinedType[_] => {
- val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed)
+ val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, rand)
// Because random data generator at here returns scala value, we need to
// convert it to catalyst value to call udt's deserialize.
val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType)
@@ -229,4 +261,40 @@ object RandomDataGenerator {
}
}
}
+
+ // Generates a random row for `schema`.
+ def randomRow(rand: Random, schema: StructType): Row = {
+ val fields = mutable.ArrayBuffer.empty[Any]
+ schema.fields.foreach { f =>
+ f.dataType match {
+ case ArrayType(childType, nullable) => {
+ val data = if (f.nullable && rand.nextFloat() <= PROBABILITY_OF_NULL) {
+ null
+ } else {
+ val arr = mutable.ArrayBuffer.empty[Any]
+ val n = 1// rand.nextInt(10)
+ var i = 0
+ val generator = RandomDataGenerator.forType(childType, nullable, rand)
+ assert(generator.isDefined, "Unsupported type")
+ val gen = generator.get
+ while (i < n) {
+ arr += gen()
+ i += 1
+ }
+ arr
+ }
+ fields += data
+ }
+ case StructType(children) => {
+ fields += randomRow(rand, StructType(children))
+ }
+ case _ =>
+ val generator = RandomDataGenerator.forType(f.dataType, f.nullable, rand)
+ assert(generator.isDefined, "Unsupported type")
+ val gen = generator.get
+ fields += gen()
+ }
+ }
+ Row.fromSeq(fields)
+ }
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
index cccac7efa0..b8ccdf7516 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql
+import scala.util.Random
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.types._
@@ -32,7 +34,7 @@ class RandomDataGeneratorSuite extends SparkFunSuite {
*/
def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = {
val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
- val generator = RandomDataGenerator.forType(dataType, nullable, Some(33)).getOrElse {
+ val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
fail(s"Random data generator was not defined for $dataType")
}
if (nullable) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
index 59729e7646..9f19745cef 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
@@ -74,8 +74,9 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
private def testConcatOnce(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]) {
info(s"schema size $numFields1, $numFields2")
- val schema1 = RandomDataGenerator.randomSchema(numFields1, candidateTypes)
- val schema2 = RandomDataGenerator.randomSchema(numFields2, candidateTypes)
+ val random = new Random()
+ val schema1 = RandomDataGenerator.randomSchema(random, numFields1, candidateTypes)
+ val schema2 = RandomDataGenerator.randomSchema(random, numFields2, candidateTypes)
// Create the converters needed to convert from external row to internal row and to UnsafeRows.
val internalConverter1 = CatalystTypeConverters.createToCatalystConverter(schema1)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 85509751db..c119758d68 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -17,22 +17,45 @@
package org.apache.spark.sql.execution.vectorized;
import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.apache.commons.lang.NotImplementedException;
/**
* This class represents a column of values and provides the main APIs to access the data
* values. It supports all the types and contains get/put APIs as well as their batched versions.
* The batched versions are preferable whenever possible.
*
- * Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values
+ * To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
+ * columns have child columns. All of the data is stored in the child columns and the parent column
+ * contains nullability, and in the case of Arrays, the lengths and offsets into the child column.
+ * Lengths and offsets are encoded identically to INTs.
+ * Maps are just a special case of a two field struct.
+ * Strings are handled as an Array of ByteType.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is enough room before adding
+ * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
* in the current RowBatch.
*
* A ColumnVector should be considered immutable once originally created. In other words, it is not
* valid to call put APIs after reads until reset() is called.
+ *
+ * ColumnVectors are intended to be reused.
*/
public abstract class ColumnVector {
/**
- * Allocates a column with each element of size `width` either on or off heap.
+ * Allocates a column to store elements of `type` on or off heap.
+ * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+ * in number of elements, not number of bytes.
*/
public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
if (mode == MemoryMode.OFF_HEAP) {
@@ -42,13 +65,265 @@ public abstract class ColumnVector {
}
}
+ /**
+ * Holder object to return an array. This object is intended to be reused. Callers should
+ * copy the data out if it needs to be stored.
+ */
+ public static final class Array extends ArrayData {
+ // The data for this array. This array contains elements from
+ // data[offset] to data[offset + length).
+ public final ColumnVector data;
+ public int length;
+ public int offset;
+
+ // Populate if binary data is required for the Array. This is stored here as an optimization
+ // for string data.
+ public byte[] byteArray;
+ public int byteArrayOffset;
+
+ // Reused staging buffer, used for loading from offheap.
+ protected byte[] tmpByteArray = new byte[1];
+
+ protected Array(ColumnVector data) {
+ this.data = data;
+ }
+
+ @Override
+ public final int numElements() { return length; }
+
+ @Override
+ public ArrayData copy() {
+ throw new NotImplementedException();
+ }
+
+ // TODO: this is extremely expensive.
+ @Override
+ public Object[] array() {
+ DataType dt = data.dataType();
+ Object[] list = new Object[length];
+
+ if (dt instanceof ByteType) {
+ for (int i = 0; i < length; i++) {
+ if (!data.getIsNull(offset + i)) {
+ list[i] = data.getByte(offset + i);
+ }
+ }
+ } else if (dt instanceof IntegerType) {
+ for (int i = 0; i < length; i++) {
+ if (!data.getIsNull(offset + i)) {
+ list[i] = data.getInt(offset + i);
+ }
+ }
+ } else if (dt instanceof DoubleType) {
+ for (int i = 0; i < length; i++) {
+ if (!data.getIsNull(offset + i)) {
+ list[i] = data.getDouble(offset + i);
+ }
+ }
+ } else if (dt instanceof LongType) {
+ for (int i = 0; i < length; i++) {
+ if (!data.getIsNull(offset + i)) {
+ list[i] = data.getLong(offset + i);
+ }
+ }
+ } else if (dt instanceof StringType) {
+ for (int i = 0; i < length; i++) {
+ if (!data.getIsNull(offset + i)) {
+ list[i] = ColumnVectorUtils.toString(data.getByteArray(offset + i));
+ }
+ }
+ } else {
+ throw new NotImplementedException("Type " + dt);
+ }
+ return list;
+ }
+
+ @Override
+ public final boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); }
+
+ @Override
+ public final boolean getBoolean(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public byte getByte(int ordinal) { return data.getByte(offset + ordinal); }
+
+ @Override
+ public short getShort(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public int getInt(int ordinal) { return data.getInt(offset + ordinal); }
+
+ @Override
+ public long getLong(int ordinal) { return data.getLong(offset + ordinal); }
+
+ @Override
+ public float getFloat(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public double getDouble(int ordinal) { return data.getDouble(offset + ordinal); }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ Array child = data.getByteArray(offset + ordinal);
+ return UTF8String.fromBytes(child.byteArray, child.byteArrayOffset, child.length);
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InternalRow getStruct(int ordinal, int numFields) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ArrayData getArray(int ordinal) {
+ return data.getArray(offset + ordinal);
+ }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
+ throw new NotImplementedException();
+ }
+ }
+
+ /**
+ * Holder object to return a struct. This object is intended to be reused.
+ */
+ public static final class Struct extends InternalRow {
+ // The fields that make up this struct. For example, if the struct had 2 int fields, the access
+ // to it would be:
+ // int f1 = fields[0].getInt[rowId]
+ // int f2 = fields[1].getInt[rowId]
+ public final ColumnVector[] fields;
+
+ @Override
+ public boolean isNullAt(int fieldIdx) { return fields[fieldIdx].getIsNull(rowId); }
+
+ @Override
+ public boolean getBoolean(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ public byte getByte(int fieldIdx) { return fields[fieldIdx].getByte(rowId); }
+
+ @Override
+ public short getShort(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ public int getInt(int fieldIdx) { return fields[fieldIdx].getInt(rowId); }
+ public long getLong(int fieldIdx) { return fields[fieldIdx].getLong(rowId); }
+
+ @Override
+ public float getFloat(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ public double getDouble(int fieldIdx) { return fields[fieldIdx].getDouble(rowId); }
+
+ @Override
+ public Decimal getDecimal(int ordinal, int precision, int scale) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public UTF8String getUTF8String(int ordinal) {
+ Array a = getByteArray(ordinal);
+ return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+ }
+
+ @Override
+ public byte[] getBinary(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CalendarInterval getInterval(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InternalRow getStruct(int ordinal, int numFields) {
+ return fields[ordinal].getStruct(rowId);
+ }
+
+ public Array getArray(int fieldIdx) { return fields[fieldIdx].getArray(rowId); }
+
+ @Override
+ public MapData getMap(int ordinal) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Object get(int ordinal, DataType dataType) {
+ throw new NotImplementedException();
+ }
+
+ public Array getByteArray(int fieldIdx) { return fields[fieldIdx].getByteArray(rowId); }
+ public Struct getStruct(int fieldIdx) { return fields[fieldIdx].getStruct(rowId); }
+
+ @Override
+ public final int numFields() {
+ return fields.length;
+ }
+
+ @Override
+ public InternalRow copy() {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean anyNull() {
+ throw new NotImplementedException();
+ }
+
+ protected int rowId;
+
+ protected Struct(ColumnVector[] fields) {
+ this.fields = fields;
+ }
+ }
+
+ /**
+ * Returns the data type of this column.
+ */
public final DataType dataType() { return type; }
/**
* Resets this column for writing. The currently stored values are no longer accessible.
*/
public void reset() {
+ if (childColumns != null) {
+ for (ColumnVector c: childColumns) {
+ c.reset();
+ }
+ }
numNulls = 0;
+ elementsAppended = 0;
if (anyNullsSet) {
putNotNulls(0, capacity);
anyNullsSet = false;
@@ -61,6 +336,12 @@ public abstract class ColumnVector {
*/
public abstract void close();
+ /*
+ * Ensures that there is enough storage to store capcity elements. That is, the put() APIs
+ * must work for all rowIds < capcity.
+ */
+ public abstract void reserve(int capacity);
+
/**
* Returns the number of nulls in this column.
*/
@@ -99,6 +380,26 @@ public abstract class ColumnVector {
/**
* Sets the value at rowId to `value`.
*/
+ public abstract void putByte(int rowId, byte value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putBytes(int rowId, int count, byte value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ */
+ public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
+ /**
+ * Returns the value for rowId.
+ */
+ public abstract byte getByte(int rowId);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
public abstract void putInt(int rowId, int value);
/**
@@ -118,13 +419,39 @@ public abstract class ColumnVector {
public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
/**
- * Returns the integer for rowId.
+ * Returns the value for rowId.
*/
public abstract int getInt(int rowId);
/**
* Sets the value at rowId to `value`.
*/
+ public abstract void putLong(int rowId, long value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to value.
+ */
+ public abstract void putLongs(int rowId, int count, long value);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+ */
+ public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);
+
+ /**
+ * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+ * The data in src must be 8-byte little endian longs.
+ */
+ public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+ /**
+ * Returns the value for rowId.
+ */
+ public abstract long getLong(int rowId);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
public abstract void putDouble(int rowId, double value);
/**
@@ -145,14 +472,248 @@ public abstract class ColumnVector {
public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
/**
- * Returns the double for rowId.
+ * Returns the value for rowId.
*/
public abstract double getDouble(int rowId);
/**
+ * Puts a byte array that already exists in this column.
+ */
+ public abstract void putArray(int rowId, int offset, int length);
+
+ /**
+ * Returns the length of the array at rowid.
+ */
+ public abstract int getArrayLength(int rowId);
+
+ /**
+ * Returns the offset of the array at rowid.
+ */
+ public abstract int getArrayOffset(int rowId);
+
+ /**
+ * Returns a utility object to get structs.
+ */
+ public Struct getStruct(int rowId) {
+ resultStruct.rowId = rowId;
+ return resultStruct;
+ }
+
+ /**
+ * Returns the array at rowid.
+ */
+ public final Array getArray(int rowId) {
+ resultArray.length = getArrayLength(rowId);
+ resultArray.offset = getArrayOffset(rowId);
+ return resultArray;
+ }
+
+ /**
+ * Loads the data into array.byteArray.
+ */
+ public abstract void loadBytes(Array array);
+
+ /**
+ * Sets the value at rowId to `value`.
+ */
+ public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
+ public final int putByteArray(int rowId, byte[] value) {
+ return putByteArray(rowId, value, 0, value.length);
+ }
+
+ /**
+ * Returns the value for rowId.
+ */
+ public final Array getByteArray(int rowId) {
+ Array array = getArray(rowId);
+ array.data.loadBytes(array);
+ return array;
+ }
+
+ /**
+ * Append APIs. These APIs all behave similarly and will append data to the current vector. It
+ * is not valid to mix the put and append APIs. The append APIs are slower and should only be
+ * used if the sizes are not known up front.
+ * In all these cases, the return value is the rowId for the first appended element.
+ */
+ public final int appendNull() {
+ assert (!(dataType() instanceof StructType)); // Use appendStruct()
+ reserve(elementsAppended + 1);
+ putNull(elementsAppended);
+ return elementsAppended++;
+ }
+
+ public final int appendNotNull() {
+ reserve(elementsAppended + 1);
+ putNotNull(elementsAppended);
+ return elementsAppended++;
+ }
+
+ public final int appendNulls(int count) {
+ assert (!(dataType() instanceof StructType));
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putNulls(elementsAppended, count);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendNotNulls(int count) {
+ assert (!(dataType() instanceof StructType));
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putNotNulls(elementsAppended, count);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendByte(byte v) {
+ reserve(elementsAppended + 1);
+ putByte(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendBytes(int count, byte v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putBytes(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendBytes(int length, byte[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putBytes(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendInt(int v) {
+ reserve(elementsAppended + 1);
+ putInt(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendInts(int count, int v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putInts(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendInts(int length, int[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putInts(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendLong(long v) {
+ reserve(elementsAppended + 1);
+ putLong(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendLongs(int count, long v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putLongs(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendLongs(int length, long[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putLongs(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendDouble(double v) {
+ reserve(elementsAppended + 1);
+ putDouble(elementsAppended, v);
+ return elementsAppended++;
+ }
+
+ public final int appendDoubles(int count, double v) {
+ reserve(elementsAppended + count);
+ int result = elementsAppended;
+ putDoubles(elementsAppended, count, v);
+ elementsAppended += count;
+ return result;
+ }
+
+ public final int appendDoubles(int length, double[] src, int offset) {
+ reserve(elementsAppended + length);
+ int result = elementsAppended;
+ putDoubles(elementsAppended, length, src, offset);
+ elementsAppended += length;
+ return result;
+ }
+
+ public final int appendByteArray(byte[] value, int offset, int length) {
+ int copiedOffset = arrayData().appendBytes(length, value, offset);
+ reserve(elementsAppended + 1);
+ putArray(elementsAppended, copiedOffset, length);
+ return elementsAppended++;
+ }
+
+ public final int appendArray(int length) {
+ reserve(elementsAppended + 1);
+ putArray(elementsAppended, arrayData().elementsAppended, length);
+ return elementsAppended++;
+ }
+
+ /**
+ * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this
+ * recursively appends a NULL to its children.
+ * We don't have this logic as the general appendNull implementation to optimize the more
+ * common non-struct case.
+ */
+ public final int appendStruct(boolean isNull) {
+ if (isNull) {
+ appendNull();
+ for (ColumnVector c: childColumns) {
+ if (c.type instanceof StructType) {
+ c.appendStruct(true);
+ } else {
+ c.appendNull();
+ }
+ }
+ } else {
+ appendNotNull();
+ }
+ return elementsAppended;
+ }
+
+ /**
+ * Returns the data for the underlying array.
+ */
+ public final ColumnVector arrayData() { return childColumns[0]; }
+
+ /**
+ * Returns the ordinal's child data column.
+ */
+ public final ColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
+
+ /**
+ * Returns the elements appended.
+ */
+ public int getElementsAppended() { return elementsAppended; }
+
+ /**
* Maximum number of rows that can be stored in this column.
*/
- protected final int capacity;
+ protected int capacity;
+
+ /**
+ * Data type for this column.
+ */
+ protected final DataType type;
/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
@@ -166,12 +727,63 @@ public abstract class ColumnVector {
protected boolean anyNullsSet;
/**
- * Data type for this column.
+ * Default size of each array length value. This grows as necessary.
*/
- protected final DataType type;
+ protected static final int DEFAULT_ARRAY_LENGTH = 4;
+
+ /**
+ * Current write cursor (row index) when appending data.
+ */
+ protected int elementsAppended;
- protected ColumnVector(int capacity, DataType type) {
+ /**
+ * If this is a nested type (array or struct), the column for the child data.
+ */
+ protected final ColumnVector[] childColumns;
+
+ /**
+ * Reusable Array holder for getArray().
+ */
+ protected final Array resultArray;
+
+ /**
+ * Reusable Struct holder for getStruct().
+ */
+ protected final Struct resultStruct;
+
+ /**
+ * Sets up the common state and also handles creating the child columns if this is a nested
+ * type.
+ */
+ protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
this.capacity = capacity;
this.type = type;
+
+ if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType) {
+ DataType childType;
+ int childCapacity = capacity;
+ if (type instanceof ArrayType) {
+ childType = ((ArrayType)type).elementType();
+ } else {
+ childType = DataTypes.ByteType;
+ childCapacity *= DEFAULT_ARRAY_LENGTH;
+ }
+ this.childColumns = new ColumnVector[1];
+ this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode);
+ this.resultArray = new Array(this.childColumns[0]);
+ this.resultStruct = null;
+ } else if (type instanceof StructType) {
+ StructType st = (StructType)type;
+ this.childColumns = new ColumnVector[st.fields().length];
+ for (int i = 0; i < childColumns.length; ++i) {
+ this.childColumns[i] = ColumnVector.allocate(capacity, st.fields()[i].dataType(), memMode);
+ }
+ this.resultArray = null;
+ this.resultStruct = new Struct(this.childColumns);
+ } else {
+ this.childColumns = null;
+ this.resultArray = null;
+ this.resultStruct = null;
+ }
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
new file mode 100644
index 0000000000..6c651a759d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.*;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * Utilities to help manipulate data associate with ColumnVectors. These should be used mostly
+ * for debugging or other non-performance critical paths.
+ * These utilities are mostly used to convert ColumnVectors into other formats.
+ */
+public class ColumnVectorUtils {
+ public static String toString(ColumnVector.Array a) {
+ return new String(a.byteArray, a.byteArrayOffset, a.length);
+ }
+
+ /**
+ * Returns the array data as the java primitive array.
+ * For example, an array of IntegerType will return an int[].
+ * Throws exceptions for unhandled schemas.
+ */
+ public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
+ DataType dt = array.data.dataType();
+ if (dt instanceof IntegerType) {
+ int[] result = new int[array.length];
+ ColumnVector data = array.data;
+ for (int i = 0; i < result.length; i++) {
+ if (data.getIsNull(array.offset + i)) {
+ throw new RuntimeException("Cannot handle NULL values.");
+ }
+ result[i] = data.getInt(array.offset + i);
+ }
+ return result;
+ } else {
+ throw new NotImplementedException();
+ }
+ }
+
+ private static void appendValue(ColumnVector dst, DataType t, Object o) {
+ if (o == null) {
+ dst.appendNull();
+ } else {
+ if (t == DataTypes.ByteType) {
+ dst.appendByte(((Byte)o).byteValue());
+ } else if (t == DataTypes.IntegerType) {
+ dst.appendInt(((Integer)o).intValue());
+ } else if (t == DataTypes.LongType) {
+ dst.appendLong(((Long)o).longValue());
+ } else if (t == DataTypes.DoubleType) {
+ dst.appendDouble(((Double)o).doubleValue());
+ } else if (t == DataTypes.StringType) {
+ byte[] b =((String)o).getBytes();
+ dst.appendByteArray(b, 0, b.length);
+ } else {
+ throw new NotImplementedException("Type " + t);
+ }
+ }
+ }
+
+ private static void appendValue(ColumnVector dst, DataType t, Row src, int fieldIdx) {
+ if (t instanceof ArrayType) {
+ ArrayType at = (ArrayType)t;
+ if (src.isNullAt(fieldIdx)) {
+ dst.appendNull();
+ } else {
+ List<Object> values = src.getList(fieldIdx);
+ dst.appendArray(values.size());
+ for (Object o : values) {
+ appendValue(dst.arrayData(), at.elementType(), o);
+ }
+ }
+ } else if (t instanceof StructType) {
+ StructType st = (StructType)t;
+ if (src.isNullAt(fieldIdx)) {
+ dst.appendStruct(true);
+ } else {
+ dst.appendStruct(false);
+ Row c = src.getStruct(fieldIdx);
+ for (int i = 0; i < st.fields().length; i++) {
+ appendValue(dst.getChildColumn(i), st.fields()[i].dataType(), c, i);
+ }
+ }
+ } else {
+ appendValue(dst, t, src.get(fieldIdx));
+ }
+ }
+
+ /**
+ * Converts an iterator of rows into a single ColumnBatch.
+ */
+ public static ColumnarBatch toBatch(
+ StructType schema, MemoryMode memMode, Iterator<Row> row) {
+ ColumnarBatch batch = ColumnarBatch.allocate(schema, memMode);
+ int n = 0;
+ while (row.hasNext()) {
+ Row r = row.next();
+ for (int i = 0; i < schema.fields().length; i++) {
+ appendValue(batch.column(i), schema.fields()[i].dataType(), r, i);
+ }
+ n++;
+ }
+ batch.setNumRows(n);
+ return batch;
+ }
+}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 2c55f854c2..d558dae50c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,12 +21,10 @@ import java.util.Iterator;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;
@@ -48,6 +46,7 @@ import org.apache.commons.lang.NotImplementedException;
*/
public final class ColumnarBatch {
private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
+ private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
private final StructType schema;
private final int capacity;
@@ -64,6 +63,10 @@ public final class ColumnarBatch {
return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
}
+ public static ColumnarBatch allocate(StructType type) {
+ return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE);
+ }
+
public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
return new ColumnarBatch(schema, maxRows, memMode);
}
@@ -82,25 +85,53 @@ public final class ColumnarBatch {
* Adapter class to interop with existing components that expect internal row. A lot of
* performance is lost with this translation.
*/
- public final class Row extends InternalRow {
+ public static final class Row extends InternalRow {
private int rowId;
+ private final ColumnarBatch parent;
+ private final int fixedLenRowSize;
+
+ private Row(ColumnarBatch parent) {
+ this.parent = parent;
+ this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
+ }
/**
* Marks this row as being filtered out. This means a subsequent iteration over the rows
* in this batch will not include this row.
*/
public final void markFiltered() {
- ColumnarBatch.this.markFiltered(rowId);
+ parent.markFiltered(rowId);
}
@Override
public final int numFields() {
- return ColumnarBatch.this.numCols();
+ return parent.numCols();
}
@Override
+ /**
+ * Revisit this. This is expensive.
+ */
public final InternalRow copy() {
- throw new NotImplementedException();
+ UnsafeRow row = new UnsafeRow(parent.numCols());
+ row.pointTo(new byte[fixedLenRowSize], fixedLenRowSize);
+ for (int i = 0; i < parent.numCols(); i++) {
+ if (isNullAt(i)) {
+ row.setNullAt(i);
+ } else {
+ DataType dt = parent.schema.fields()[i].dataType();
+ if (dt instanceof IntegerType) {
+ row.setInt(i, getInt(i));
+ } else if (dt instanceof LongType) {
+ row.setLong(i, getLong(i));
+ } else if (dt instanceof DoubleType) {
+ row.setDouble(i, getDouble(i));
+ } else {
+ throw new RuntimeException("Not implemented.");
+ }
+ }
+ }
+ return row;
}
@Override
@@ -110,7 +141,7 @@ public final class ColumnarBatch {
@Override
public final boolean isNullAt(int ordinal) {
- return ColumnarBatch.this.column(ordinal).getIsNull(rowId);
+ return parent.column(ordinal).getIsNull(rowId);
}
@Override
@@ -119,9 +150,7 @@ public final class ColumnarBatch {
}
@Override
- public final byte getByte(int ordinal) {
- throw new NotImplementedException();
- }
+ public final byte getByte(int ordinal) { return parent.column(ordinal).getByte(rowId); }
@Override
public final short getShort(int ordinal) {
@@ -130,13 +159,11 @@ public final class ColumnarBatch {
@Override
public final int getInt(int ordinal) {
- return ColumnarBatch.this.column(ordinal).getInt(rowId);
+ return parent.column(ordinal).getInt(rowId);
}
@Override
- public final long getLong(int ordinal) {
- throw new NotImplementedException();
- }
+ public final long getLong(int ordinal) { return parent.column(ordinal).getLong(rowId); }
@Override
public final float getFloat(int ordinal) {
@@ -145,7 +172,7 @@ public final class ColumnarBatch {
@Override
public final double getDouble(int ordinal) {
- return ColumnarBatch.this.column(ordinal).getDouble(rowId);
+ return parent.column(ordinal).getDouble(rowId);
}
@Override
@@ -155,7 +182,8 @@ public final class ColumnarBatch {
@Override
public final UTF8String getUTF8String(int ordinal) {
- throw new NotImplementedException();
+ ColumnVector.Array a = parent.column(ordinal).getByteArray(rowId);
+ return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
}
@Override
@@ -170,12 +198,12 @@ public final class ColumnarBatch {
@Override
public final InternalRow getStruct(int ordinal, int numFields) {
- throw new NotImplementedException();
+ return parent.column(ordinal).getStruct(rowId);
}
@Override
public final ArrayData getArray(int ordinal) {
- throw new NotImplementedException();
+ return parent.column(ordinal).getArray(rowId);
}
@Override
@@ -194,7 +222,7 @@ public final class ColumnarBatch {
*/
public Iterator<Row> rowIterator() {
final int maxRows = ColumnarBatch.this.numRows();
- final Row row = new Row();
+ final Row row = new Row(this);
return new Iterator<Row>() {
int rowId = 0;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6180dd308e..335124fd5a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -18,14 +18,20 @@ package org.apache.spark.sql.execution.vectorized;
import java.nio.ByteOrder;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DoubleType;
import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.NotImplementedException;
+
/**
* Column data backed using offheap memory.
*/
@@ -35,21 +41,21 @@ public final class OffHeapColumnVector extends ColumnVector {
private long nulls;
private long data;
+ // Set iff the type is array.
+ private long lengthData;
+ private long offsetData;
+
protected OffHeapColumnVector(int capacity, DataType type) {
- super(capacity, type);
+ super(capacity, type, MemoryMode.OFF_HEAP);
if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
throw new NotImplementedException("Only little endian is supported.");
}
+ nulls = 0;
+ data = 0;
+ lengthData = 0;
+ offsetData = 0;
- this.nulls = Platform.allocateMemory(capacity);
- if (type instanceof IntegerType) {
- this.data = Platform.allocateMemory(capacity * 4);
- } else if (type instanceof DoubleType) {
- this.data = Platform.allocateMemory(capacity * 8);
- } else {
- throw new RuntimeException("Unhandled " + type);
- }
- anyNullsSet = true;
+ reserveInternal(capacity);
reset();
}
@@ -67,8 +73,12 @@ public final class OffHeapColumnVector extends ColumnVector {
public final void close() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
+ Platform.freeMemory(lengthData);
+ Platform.freeMemory(offsetData);
nulls = 0;
data = 0;
+ lengthData = 0;
+ offsetData = 0;
}
//
@@ -112,6 +122,33 @@ public final class OffHeapColumnVector extends ColumnVector {
}
//
+ // APIs dealing with Bytes
+ //
+
+ @Override
+ public final void putByte(int rowId, byte value) {
+ Platform.putByte(null, data + rowId, value);
+
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte value) {
+ for (int i = 0; i < count; ++i) {
+ Platform.putByte(null, data + rowId + i, value);
+ }
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count);
+ }
+
+ @Override
+ public final byte getByte(int rowId) {
+ return Platform.getByte(null, data + rowId);
+ }
+
+ //
// APIs dealing with ints
//
@@ -146,6 +183,40 @@ public final class OffHeapColumnVector extends ColumnVector {
}
//
+ // APIs dealing with Longs
+ //
+
+ @Override
+ public final void putLong(int rowId, long value) {
+ Platform.putLong(null, data + 8 * rowId, value);
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long value) {
+ long offset = data + 8 * rowId;
+ for (int i = 0; i < count; ++i, offset += 8) {
+ Platform.putLong(null, offset, value);
+ }
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8,
+ null, data + 8 * rowId, count * 8);
+ }
+
+ @Override
+ public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+ null, data + 8 * rowId, count * 8);
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return Platform.getLong(null, data + 8 * rowId);
+ }
+
+ //
// APIs dealing with doubles
//
@@ -178,4 +249,70 @@ public final class OffHeapColumnVector extends ColumnVector {
public final double getDouble(int rowId) {
return Platform.getDouble(null, data + rowId * 8);
}
+
+ //
+ // APIs dealing with Arrays.
+ //
+ @Override
+ public final void putArray(int rowId, int offset, int length) {
+ assert(offset >= 0 && offset + length <= childColumns[0].capacity);
+ Platform.putInt(null, lengthData + 4 * rowId, length);
+ Platform.putInt(null, offsetData + 4 * rowId, offset);
+ }
+
+ @Override
+ public final int getArrayLength(int rowId) {
+ return Platform.getInt(null, lengthData + 4 * rowId);
+ }
+
+ @Override
+ public final int getArrayOffset(int rowId) {
+ return Platform.getInt(null, offsetData + 4 * rowId);
+ }
+
+ // APIs dealing with ByteArrays
+ @Override
+ public final int putByteArray(int rowId, byte[] value, int offset, int length) {
+ int result = arrayData().appendBytes(length, value, offset);
+ Platform.putInt(null, lengthData + 4 * rowId, length);
+ Platform.putInt(null, offsetData + 4 * rowId, result);
+ return result;
+ }
+
+ @Override
+ public final void loadBytes(Array array) {
+ if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
+ Platform.copyMemory(
+ null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
+ array.byteArray = array.tmpByteArray;
+ array.byteArrayOffset = 0;
+ }
+
+ @Override
+ public final void reserve(int requiredCapacity) {
+ if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
+ }
+
+ // Split out the slow path.
+ private final void reserveInternal(int newCapacity) {
+ if (this.resultArray != null) {
+ this.lengthData =
+ Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);
+ this.offsetData =
+ Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4);
+ } else if (type instanceof ByteType) {
+ this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity);
+ } else if (type instanceof IntegerType) {
+ this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
+ } else if (type instanceof LongType || type instanceof DoubleType) {
+ this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
+ } else if (resultStruct != null) {
+ // Nothing to store.
+ } else {
+ throw new RuntimeException("Unhandled " + type);
+ }
+ this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity);
+ Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended);
+ capacity = newCapacity;
+ }
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 76d9956c38..8197fa11cd 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -16,13 +16,10 @@
*/
package org.apache.spark.sql.execution.vectorized;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DoubleType;
-import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
-import java.nio.ByteBuffer;
-import java.nio.DoubleBuffer;
import java.util.Arrays;
/**
@@ -37,19 +34,18 @@ public final class OnHeapColumnVector extends ColumnVector {
private byte[] nulls;
// Array for each type. Only 1 is populated for any type.
+ private byte[] byteData;
private int[] intData;
+ private long[] longData;
private double[] doubleData;
+ // Only set if type is Array.
+ private int[] arrayLengths;
+ private int[] arrayOffsets;
+
protected OnHeapColumnVector(int capacity, DataType type) {
- super(capacity, type);
- if (type instanceof IntegerType) {
- this.intData = new int[capacity];
- } else if (type instanceof DoubleType) {
- this.doubleData = new double[capacity];
- } else {
- throw new RuntimeException("Unhandled " + type);
- }
- this.nulls = new byte[capacity];
+ super(capacity, type, MemoryMode.ON_HEAP);
+ reserveInternal(capacity);
reset();
}
@@ -109,6 +105,32 @@ public final class OnHeapColumnVector extends ColumnVector {
}
//
+ // APIs dealing with Bytes
+ //
+
+ @Override
+ public final void putByte(int rowId, byte value) {
+ byteData[rowId] = value;
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte value) {
+ for (int i = 0; i < count; ++i) {
+ byteData[i + rowId] = value;
+ }
+ }
+
+ @Override
+ public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+ System.arraycopy(src, srcIndex, byteData, rowId, count);
+ }
+
+ @Override
+ public final byte getByte(int rowId) {
+ return byteData[rowId];
+ }
+
+ //
// APIs dealing with Ints
//
@@ -145,6 +167,43 @@ public final class OnHeapColumnVector extends ColumnVector {
}
//
+ // APIs dealing with Longs
+ //
+
+ @Override
+ public final void putLong(int rowId, long value) {
+ longData[rowId] = value;
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long value) {
+ for (int i = 0; i < count; ++i) {
+ longData[i + rowId] = value;
+ }
+ }
+
+ @Override
+ public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+ System.arraycopy(src, srcIndex, longData, rowId, count);
+ }
+
+ @Override
+ public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+ int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+ for (int i = 0; i < count; ++i) {
+ longData[i + rowId] = Platform.getLong(src, srcOffset);
+ srcIndex += 8;
+ srcOffset += 8;
+ }
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return longData[rowId];
+ }
+
+
+ //
// APIs dealing with doubles
//
@@ -173,4 +232,86 @@ public final class OnHeapColumnVector extends ColumnVector {
public final double getDouble(int rowId) {
return doubleData[rowId];
}
+
+ //
+ // APIs dealing with Arrays
+ //
+
+ @Override
+ public final int getArrayLength(int rowId) {
+ return arrayLengths[rowId];
+ }
+ @Override
+ public final int getArrayOffset(int rowId) {
+ return arrayOffsets[rowId];
+ }
+
+ @Override
+ public final void putArray(int rowId, int offset, int length) {
+ arrayOffsets[rowId] = offset;
+ arrayLengths[rowId] = length;
+ }
+
+ @Override
+ public final void loadBytes(Array array) {
+ array.byteArray = byteData;
+ array.byteArrayOffset = array.offset;
+ }
+
+ //
+ // APIs dealing with Byte Arrays
+ //
+
+ @Override
+ public final int putByteArray(int rowId, byte[] value, int offset, int length) {
+ int result = arrayData().appendBytes(length, value, offset);
+ arrayOffsets[rowId] = result;
+ arrayLengths[rowId] = length;
+ return result;
+ }
+
+ @Override
+ public final void reserve(int requiredCapacity) {
+ if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
+ }
+
+ // Spilt this function out since it is the slow path.
+ private final void reserveInternal(int newCapacity) {
+ if (this.resultArray != null) {
+ int[] newLengths = new int[newCapacity];
+ int[] newOffsets = new int[newCapacity];
+ if (this.arrayLengths != null) {
+ System.arraycopy(this.arrayLengths, 0, newLengths, 0, elementsAppended);
+ System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, elementsAppended);
+ }
+ arrayLengths = newLengths;
+ arrayOffsets = newOffsets;
+ } else if (type instanceof ByteType) {
+ byte[] newData = new byte[newCapacity];
+ if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
+ byteData = newData;
+ } else if (type instanceof IntegerType) {
+ int[] newData = new int[newCapacity];
+ if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
+ intData = newData;
+ } else if (type instanceof LongType) {
+ long[] newData = new long[newCapacity];
+ if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
+ longData = newData;
+ } else if (type instanceof DoubleType) {
+ double[] newData = new double[newCapacity];
+ if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended);
+ doubleData = newData;
+ } else if (resultStruct != null) {
+ // Nothing to store.
+ } else {
+ throw new RuntimeException("Unhandled " + type);
+ }
+
+ byte[] newNulls = new byte[newCapacity];
+ if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, elementsAppended);
+ nulls = newNulls;
+
+ capacity = newCapacity;
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 95c9550aeb..8a95359d9d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -40,8 +40,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
private val rand = new Random(42)
for (i <- 0 until 6) {
- val keySchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, keyTypes)
- val valueSchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, valueTypes)
+ val keySchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, keyTypes)
+ val valueSchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, valueTypes)
testKVSorter(keySchema, valueSchema, spill = i > 3)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index bfe944d835..8efdf8adb0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.nio.ByteBuffer
+import scala.util.Random
+
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.vectorized.ColumnVector
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{BinaryType, IntegerType}
import org.apache.spark.unsafe.Platform
import org.apache.spark.util.Benchmark
import org.apache.spark.util.collection.BitSet
@@ -239,6 +241,26 @@ object ColumnarBatchBenchmark {
Platform.freeMemory(buffer)
}
+ // Adding values by appending, instead of putting.
+ val onHeapAppend = { i: Int =>
+ val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ col.appendInt(i)
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += col.getInt(i)
+ i += 1
+ }
+ col.reset()
+ }
+ col.close
+ }
+
/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Int Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate
@@ -253,6 +275,7 @@ object ColumnarBatchBenchmark {
Column(off heap direct) 237.6 1379.12 1.05 X
UnsafeRow (on heap) 414.6 790.35 0.60 X
UnsafeRow (off heap) 487.2 672.58 0.51 X
+ Column On Heap Append 530.1 618.14 0.59 X
*/
val benchmark = new Benchmark("Int Read/Write", count * iters)
benchmark.addCase("Java Array")(javaArray)
@@ -265,6 +288,7 @@ object ColumnarBatchBenchmark {
benchmark.addCase("Column(off heap direct)")(columnOffheapDirect)
benchmark.addCase("UnsafeRow (on heap)")(unsafeRowOnheap)
benchmark.addCase("UnsafeRow (off heap)")(unsafeRowOffheap)
+ benchmark.addCase("Column On Heap Append")(onHeapAppend)
benchmark.run()
}
@@ -314,8 +338,60 @@ object ColumnarBatchBenchmark {
benchmark.run()
}
+ def stringAccess(iters: Long): Unit = {
+ val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+ val random = new Random(0)
+
+ def randomString(min: Int, max: Int): String = {
+ val len = random.nextInt(max - min) + min
+ val sb = new StringBuilder(len)
+ var i = 0
+ while (i < len) {
+ sb.append(chars.charAt(random.nextInt(chars.length())));
+ i += 1
+ }
+ return sb.toString
+ }
+
+ val minString = 3
+ val maxString = 32
+ val count = 4 * 1000
+
+ val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray
+
+ def column(memoryMode: MemoryMode) = { i: Int =>
+ val column = ColumnVector.allocate(count, BinaryType, memoryMode)
+ var sum = 0L
+ for (n <- 0L until iters) {
+ var i = 0
+ while (i < count) {
+ column.putByteArray(i, data(i))
+ i += 1
+ }
+ i = 0
+ while (i < count) {
+ sum += column.getByteArray(i).length
+ i += 1
+ }
+ column.reset()
+ }
+ }
+
+ /*
+ String Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate
+ -------------------------------------------------------------------------------------
+ On Heap 457.0 35.85 1.00 X
+ Off Heap 1206.0 13.59 0.38 X
+ */
+ val benchmark = new Benchmark("String Read/Write", count * iters)
+ benchmark.addCase("On Heap")(column(MemoryMode.ON_HEAP))
+ benchmark.addCase("Off Heap")(column(MemoryMode.OFF_HEAP))
+ benchmark.run
+ }
+
def main(args: Array[String]): Unit = {
intAccess(1024 * 40)
booleanAccess(1024 * 40)
+ stringAccess(1024 * 4)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index d5e517c7f5..215ca9ab6b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -17,14 +17,15 @@
package org.apache.spark.sql.execution.vectorized
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Random
import org.apache.spark.SparkFunSuite
import org.apache.spark.memory.MemoryMode
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.Platform
class ColumnarBatchSuite extends SparkFunSuite {
@@ -74,6 +75,45 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}
+ test("Byte Apis") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val reference = mutable.ArrayBuffer.empty[Byte]
+
+ val column = ColumnVector.allocate(1024, ByteType, memMode)
+ var idx = 0
+
+ val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
+ column.putBytes(idx, 2, values, 0)
+ reference += 1
+ reference += 2
+ idx += 2
+
+ column.putBytes(idx, 3, values, 2)
+ reference += 3
+ reference += 4
+ reference += 5
+ idx += 3
+
+ column.putByte(idx, 9)
+ reference += 9
+ idx += 1
+
+ column.putBytes(idx, 3, 4)
+ reference += 4
+ reference += 4
+ reference += 4
+ idx += 3
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode)
+ if (memMode == MemoryMode.OFF_HEAP) {
+ val addr = column.valuesNativeAddress()
+ assert(v._1 == Platform.getByte(null, addr + v._2))
+ }
+ }
+ }}
+ }
+
test("Int Apis") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
@@ -142,6 +182,76 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}
+ test("Long Apis") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val seed = System.currentTimeMillis()
+ val random = new Random(seed)
+ val reference = mutable.ArrayBuffer.empty[Long]
+
+ val column = ColumnVector.allocate(1024, LongType, memMode)
+ var idx = 0
+
+ val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
+ column.putLongs(idx, 2, values, 0)
+ reference += 1
+ reference += 2
+ idx += 2
+
+ column.putLongs(idx, 3, values, 2)
+ reference += 3
+ reference += 4
+ reference += 5
+ idx += 3
+
+ val littleEndian = new Array[Byte](16)
+ littleEndian(0) = 7
+ littleEndian(1) = 1
+ littleEndian(8) = 6
+ littleEndian(10) = 1
+
+ column.putLongsLittleEndian(idx, 1, littleEndian, 8)
+ column.putLongsLittleEndian(idx + 1, 1, littleEndian, 0)
+ reference += 6 + (1 << 16)
+ reference += 7 + (1 << 8)
+ idx += 2
+
+ column.putLongsLittleEndian(idx, 2, littleEndian, 0)
+ reference += 7 + (1 << 8)
+ reference += 6 + (1 << 16)
+ idx += 2
+
+ while (idx < column.capacity) {
+ val single = random.nextBoolean()
+ if (single) {
+ val v = random.nextLong()
+ column.putLong(idx, v)
+ reference += v
+ idx += 1
+ } else {
+
+ val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+ column.putLongs(idx, n, n + 1)
+ var i = 0
+ while (i < n) {
+ reference += (n + 1)
+ i += 1
+ }
+ idx += n
+ }
+ }
+
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
+ " Seed = " + seed + " MemMode=" + memMode)
+ if (memMode == MemoryMode.OFF_HEAP) {
+ val addr = column.valuesNativeAddress()
+ assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
+ }
+ }
+ }}
+ }
+
test("Double APIs") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val seed = System.currentTimeMillis()
@@ -209,15 +319,150 @@ class ColumnarBatchSuite extends SparkFunSuite {
}}
}
+ test("String APIs") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val reference = mutable.ArrayBuffer.empty[String]
+
+ val column = ColumnVector.allocate(6, BinaryType, memMode)
+ assert(column.arrayData().elementsAppended == 0)
+ var idx = 0
+
+ val values = ("Hello" :: "abc" :: Nil).toArray
+ column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length)
+ reference += values(0)
+ idx += 1
+ assert(column.arrayData().elementsAppended == 5)
+
+ column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length)
+ reference += values(1)
+ idx += 1
+ assert(column.arrayData().elementsAppended == 8)
+
+ // Just put llo
+ val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2)
+ reference += "llo"
+ idx += 1
+ assert(column.arrayData().elementsAppended == 11)
+
+ // Put the same "ll" at offset. This should not allocate more memory in the column.
+ column.putArray(idx, offset, 2)
+ reference += "ll"
+ idx += 1
+ assert(column.arrayData().elementsAppended == 11)
+
+ // Put a long string
+ val s = "abcdefghijklmnopqrstuvwxyz"
+ column.putByteArray(idx, (s + s).getBytes)
+ reference += (s + s)
+ idx += 1
+ assert(column.arrayData().elementsAppended == 11 + (s + s).length)
+
+ reference.zipWithIndex.foreach { v =>
+ assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
+ assert(v._1 == ColumnVectorUtils.toString(column.getByteArray(v._2)),
+ "MemoryMode" + memMode)
+ }
+
+ column.reset()
+ assert(column.arrayData().elementsAppended == 0)
+ }}
+ }
+
+ test("Int Array") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)
+
+ // Fill the underlying data with all the arrays back to back.
+ val data = column.arrayData();
+ var i = 0
+ while (i < 6) {
+ data.putInt(i, i)
+ i += 1
+ }
+
+ // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
+ column.putArray(0, 0, 1)
+ column.putArray(1, 1, 2)
+ column.putArray(2, 2, 0)
+ column.putArray(3, 3, 3)
+
+ val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
+ val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
+ val a3 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(2)).asInstanceOf[Array[Int]]
+ val a4 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(3)).asInstanceOf[Array[Int]]
+ assert(a1 === Array(0))
+ assert(a2 === Array(1, 2))
+ assert(a3 === Array.empty[Int])
+ assert(a4 === Array(3, 4, 5))
+
+ // Verify the ArrayData APIs
+ assert(column.getArray(0).length == 1)
+ assert(column.getArray(0).getInt(0) == 0)
+
+ assert(column.getArray(1).length == 2)
+ assert(column.getArray(1).getInt(0) == 1)
+ assert(column.getArray(1).getInt(1) == 2)
+
+ assert(column.getArray(2).length == 0)
+
+ assert(column.getArray(3).length == 3)
+ assert(column.getArray(3).getInt(0) == 3)
+ assert(column.getArray(3).getInt(1) == 4)
+ assert(column.getArray(3).getInt(2) == 5)
+
+ // Add a longer array which requires resizing
+ column.reset
+ val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
+ assert(data.capacity == 10)
+ data.reserve(array.length)
+ assert(data.capacity == array.length * 2)
+ data.putInts(0, array.length, array, 0)
+ column.putArray(0, 0, array.length)
+ assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
+ === array)
+ }}
+ }
+
+ test("Struct Column") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
+ val column = ColumnVector.allocate(1024, schema, memMode)
+
+ val c1 = column.getChildColumn(0)
+ val c2 = column.getChildColumn(1)
+ assert(c1.dataType() == IntegerType)
+ assert(c2.dataType() == DoubleType)
+
+ c1.putInt(0, 123)
+ c2.putDouble(0, 3.45)
+ c1.putInt(1, 456)
+ c2.putDouble(1, 5.67)
+
+ val s = column.getStruct(0)
+ assert(s.fields(0).getInt(0) == 123)
+ assert(s.fields(0).getInt(1) == 456)
+ assert(s.fields(1).getDouble(0) == 3.45)
+ assert(s.fields(1).getDouble(1) == 5.67)
+
+ assert(s.getInt(0) == 123)
+ assert(s.getDouble(1) == 3.45)
+
+ val s2 = column.getStruct(1)
+ assert(s2.getInt(0) == 456)
+ assert(s2.getDouble(1) == 5.67)
+ }}
+ }
+
test("ColumnarBatch basic") {
(MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
val schema = new StructType()
.add("intCol", IntegerType)
.add("doubleCol", DoubleType)
.add("intCol2", IntegerType)
+ .add("string", BinaryType)
val batch = ColumnarBatch.allocate(schema, memMode)
- assert(batch.numCols() == 3)
+ assert(batch.numCols() == 4)
assert(batch.numRows() == 0)
assert(batch.numValidRows() == 0)
assert(batch.capacity() > 0)
@@ -227,10 +472,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
batch.column(0).putInt(0, 1)
batch.column(1).putDouble(0, 1.1)
batch.column(2).putNull(0)
+ batch.column(3).putByteArray(0, "Hello".getBytes)
batch.setNumRows(1)
// Verify the results of the row.
- assert(batch.numCols() == 3)
+ assert(batch.numCols() == 4)
assert(batch.numRows() == 1)
assert(batch.numValidRows() == 1)
assert(batch.rowIterator().hasNext == true)
@@ -241,6 +487,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(batch.column(1).getDouble(0) == 1.1)
assert(batch.column(1).getIsNull(0) == false)
assert(batch.column(2).getIsNull(0) == true)
+ assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello")
// Verify the iterator works correctly.
val it = batch.rowIterator()
@@ -251,6 +498,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(row.getDouble(1) == 1.1)
assert(row.isNullAt(1) == false)
assert(row.isNullAt(2) == true)
+ assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello")
assert(it.hasNext == false)
assert(it.hasNext == false)
@@ -260,24 +508,27 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(batch.numValidRows() == 0)
assert(batch.rowIterator().hasNext == false)
- // Reset and add 3 throws
+ // Reset and add 3 rows
batch.reset()
assert(batch.numRows() == 0)
assert(batch.numValidRows() == 0)
assert(batch.rowIterator().hasNext == false)
- // Add rows [NULL, 2.2, 2], [3, NULL, 3], [4, 4.4, 4]
+ // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
batch.column(0).putNull(0)
batch.column(1).putDouble(0, 2.2)
batch.column(2).putInt(0, 2)
+ batch.column(3).putByteArray(0, "abc".getBytes)
batch.column(0).putInt(1, 3)
batch.column(1).putNull(1)
batch.column(2).putInt(1, 3)
+ batch.column(3).putByteArray(1, "".getBytes)
batch.column(0).putInt(2, 4)
batch.column(1).putDouble(2, 4.4)
batch.column(2).putInt(2, 4)
+ batch.column(3).putByteArray(2, "world".getBytes)
batch.setNumRows(3)
def rowEquals(x: InternalRow, y: Row): Unit = {
@@ -289,30 +540,152 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(x.isNullAt(2) == y.isNullAt(2))
if (!x.isNullAt(2)) assert(x.getInt(2) == y.getInt(2))
+
+ assert(x.isNullAt(3) == y.isNullAt(3))
+ if (!x.isNullAt(3)) assert(x.getString(3) == y.getString(3))
}
+
// Verify
assert(batch.numRows() == 3)
assert(batch.numValidRows() == 3)
val it2 = batch.rowIterator()
- rowEquals(it2.next(), Row(null, 2.2, 2))
- rowEquals(it2.next(), Row(3, null, 3))
- rowEquals(it2.next(), Row(4, 4.4, 4))
+ rowEquals(it2.next(), Row(null, 2.2, 2, "abc"))
+ rowEquals(it2.next(), Row(3, null, 3, ""))
+ rowEquals(it2.next(), Row(4, 4.4, 4, "world"))
assert(!it.hasNext)
// Filter out some rows and verify
batch.markFiltered(1)
assert(batch.numValidRows() == 2)
val it3 = batch.rowIterator()
- rowEquals(it3.next(), Row(null, 2.2, 2))
- rowEquals(it3.next(), Row(4, 4.4, 4))
+ rowEquals(it3.next(), Row(null, 2.2, 2, "abc"))
+ rowEquals(it3.next(), Row(4, 4.4, 4, "world"))
assert(!it.hasNext)
batch.markFiltered(2)
assert(batch.numValidRows() == 1)
val it4 = batch.rowIterator()
- rowEquals(it4.next(), Row(null, 2.2, 2))
+ rowEquals(it4.next(), Row(null, 2.2, 2, "abc"))
batch.close
}}
}
+
+
+ private def doubleEquals(d1: Double, d2: Double): Boolean = {
+ if (d1.isNaN && d2.isNaN) {
+ true
+ } else {
+ d1 == d2
+ }
+ }
+
+ private def compareStruct(fields: Seq[StructField], r1: InternalRow, r2: Row, seed: Long) {
+ fields.zipWithIndex.foreach { v => {
+ assert(r1.isNullAt(v._2) == r2.isNullAt(v._2), "Seed = " + seed)
+ if (!r1.isNullAt(v._2)) {
+ v._1.dataType match {
+ case ByteType => assert(r1.getByte(v._2) == r2.getByte(v._2), "Seed = " + seed)
+ case IntegerType => assert(r1.getInt(v._2) == r2.getInt(v._2), "Seed = " + seed)
+ case LongType => assert(r1.getLong(v._2) == r2.getLong(v._2), "Seed = " + seed)
+ case DoubleType => assert(doubleEquals(r1.getDouble(v._2), r2.getDouble(v._2)),
+ "Seed = " + seed)
+ case StringType =>
+ assert(r1.getString(v._2) == r2.getString(v._2), "Seed = " + seed)
+ case ArrayType(childType, n) =>
+ val a1 = r1.getArray(v._2).array
+ val a2 = r2.getList(v._2).toArray
+ assert(a1.length == a2.length, "Seed = " + seed)
+ childType match {
+ case DoubleType => {
+ var i = 0
+ while (i < a1.length) {
+ assert(doubleEquals(a1(i).asInstanceOf[Double], a2(i).asInstanceOf[Double]),
+ "Seed = " + seed)
+ i += 1
+ }
+ }
+ case _ => assert(a1 === a2, "Seed = " + seed)
+ }
+ case StructType(childFields) =>
+ compareStruct(childFields, r1.getStruct(v._2, fields.length), r2.getStruct(v._2), seed)
+ case _ =>
+ throw new NotImplementedError("Not implemented " + v._1.dataType)
+ }
+ }
+ }}
+ }
+
+ test("Convert rows") {
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val rows = Row(1, 2L, "a", 1.2, 'b'.toByte) :: Row(4, 5L, "cd", 2.3, 'a'.toByte) :: Nil
+ val schema = new StructType()
+ .add("i1", IntegerType)
+ .add("l2", LongType)
+ .add("string", StringType)
+ .add("d", DoubleType)
+ .add("b", ByteType)
+
+ val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
+ assert(batch.numRows() == 2)
+ assert(batch.numCols() == 5)
+
+ val it = batch.rowIterator()
+ val referenceIt = rows.iterator
+ while (it.hasNext) {
+ compareStruct(schema, it.next(), referenceIt.next(), 0)
+ }
+ batch.close()
+ }
+ }}
+
+ /**
+ * This test generates a random schema data, serializes it to column batches and verifies the
+ * results.
+ */
+ def testRandomRows(flatSchema: Boolean, numFields: Int) {
+ // TODO: add remaining types. Figure out why StringType doesn't work on jenkins.
+ val types = Array(ByteType, IntegerType, LongType, DoubleType)
+ val seed = System.nanoTime()
+ val NUM_ROWS = 500
+ val NUM_ITERS = 1000
+ val random = new Random(seed)
+ var i = 0
+ while (i < NUM_ITERS) {
+ val schema = if (flatSchema) {
+ RandomDataGenerator.randomSchema(random, numFields, types)
+ } else {
+ RandomDataGenerator.randomNestedSchema(random, numFields, types)
+ }
+ val rows = mutable.ArrayBuffer.empty[Row]
+ var j = 0
+ while (j < NUM_ROWS) {
+ val row = RandomDataGenerator.randomRow(random, schema)
+ rows += row
+ j += 1
+ }
+ (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+ val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
+ assert(batch.numRows() == NUM_ROWS)
+
+ val it = batch.rowIterator()
+ val referenceIt = rows.iterator
+ var k = 0
+ while (it.hasNext) {
+ compareStruct(schema, it.next(), referenceIt.next(), seed)
+ k += 1
+ }
+ batch.close()
+ }}
+ i += 1
+ }
+ }
+
+ test("Random flat schema") {
+ testRandomRows(true, 10)
+ }
+
+ test("Random nested schema") {
+ testRandomRows(false, 30)
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 76b36aa891..3e4cf3f79e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
+import scala.util.Random
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -879,7 +880,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
RandomDataGenerator.forType(
dataType = schemaForGenerator,
nullable = true,
- seed = Some(System.nanoTime()))
+ new Random(System.nanoTime()))
val dataGenerator =
maybeDataGenerator
.getOrElse(fail(s"Failed to create data generator for schema $schemaForGenerator"))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 3f9ecf6965..1a4b3ece72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.sources
import scala.collection.JavaConverters._
+import scala.util.Random
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -122,7 +123,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val dataGenerator = RandomDataGenerator.forType(
dataType = dataType,
nullable = true,
- seed = Some(System.nanoTime())
+ new Random(System.nanoTime())
).getOrElse {
fail(s"Failed to create data generator for schema $dataType")
}
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 0d6b215fe5..b29bf6a464 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -105,6 +105,17 @@ public final class Platform {
_UNSAFE.freeMemory(address);
}
+ public static long reallocateMemory(long address, long oldSize, long newSize) {
+ long newMemory = _UNSAFE.allocateMemory(newSize);
+ copyMemory(null, address, null, newMemory, oldSize);
+ freeMemory(address);
+ return newMemory;
+ }
+
+ public static void setMemory(long address, byte value, long size) {
+ _UNSAFE.setMemory(address, size, value);
+ }
+
public static void copyMemory(
Object src, long srcOffset, Object dst, long dstOffset, long length) {
// Check if dstOffset is before or after srcOffset to determine if we should copy