diff options
16 files changed, 1671 insertions, 90 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 1a351933a3..a88bcbfdb7 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -68,6 +68,10 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS return ((numFields + 63)/ 64) * 8; } + public static int calculateFixedPortionByteSize(int numFields) { + return 8 * numFields + calculateBitSetWidthInBytes(numFields); + } + /** * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types) */ @@ -596,10 +600,9 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS public String toString() { StringBuilder build = new StringBuilder("["); for (int i = 0; i < sizeInBytes; i += 8) { + if (i != 0) build.append(','); build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i))); - build.append(','); } - build.deleteCharAt(build.length() - 1); build.append(']'); return build.toString(); } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala index 475cbe005a..4615c55d67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String /** * A parent class for mutable container objects that are reused when the values are changed, @@ -212,6 +211,8 @@ final class SpecificMutableRow(val values: Array[MutableValue]) def this() = this(Seq.empty) + def this(schema: StructType) = this(schema.fields.map(_.dataType)) + override def numFields: Int = values.length override def setNullAt(i: Int): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala index 7614f055e9..55efea80d1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala @@ -21,6 +21,7 @@ import java.lang.Double.longBitsToDouble import java.lang.Float.intBitsToFloat import java.math.MathContext +import scala.collection.mutable import scala.util.Random import org.apache.spark.sql.catalyst.CatalystTypeConverters @@ -74,14 +75,48 @@ object RandomDataGenerator { * @param numFields the number of fields in this schema * @param acceptedTypes types to draw from. */ - def randomSchema(numFields: Int, acceptedTypes: Seq[DataType]): StructType = { + def randomSchema(rand: Random, numFields: Int, acceptedTypes: Seq[DataType]): StructType = { StructType(Seq.tabulate(numFields) { i => - val dt = acceptedTypes(Random.nextInt(acceptedTypes.size)) - StructField("col_" + i, dt, nullable = true) + val dt = acceptedTypes(rand.nextInt(acceptedTypes.size)) + StructField("col_" + i, dt, nullable = rand.nextBoolean()) }) } /** + * Returns a random nested schema. This will randomly generate structs and arrays drawn from + * acceptedTypes. + */ + def randomNestedSchema(rand: Random, totalFields: Int, acceptedTypes: Seq[DataType]): + StructType = { + val fields = mutable.ArrayBuffer.empty[StructField] + var i = 0 + var numFields = totalFields + while (numFields > 0) { + val v = rand.nextInt(3) + if (v == 0) { + // Simple type: + val dt = acceptedTypes(rand.nextInt(acceptedTypes.size)) + fields += new StructField("col_" + i, dt, rand.nextBoolean()) + numFields -= 1 + } else if (v == 1) { + // Array + val dt = acceptedTypes(rand.nextInt(acceptedTypes.size)) + fields += new StructField("col_" + i, ArrayType(dt), rand.nextBoolean()) + numFields -= 1 + } else { + // Struct + // TODO: do empty structs make sense? + val n = Math.max(rand.nextInt(numFields), 1) + val nested = randomNestedSchema(rand, n, acceptedTypes) + fields += new StructField("col_" + i, nested, rand.nextBoolean()) + numFields -= n + } + i += 1 + } + StructType(fields) + } + + /** * Returns a function which generates random values for the given [[DataType]], or `None` if no * random data generator is defined for that data type. The generated values will use an external * representation of the data type; for example, the random generator for [[DateType]] will return @@ -90,16 +125,13 @@ object RandomDataGenerator { * * @param dataType the type to generate values for * @param nullable whether null values should be generated - * @param seed an optional seed for the random number generator + * @param rand an optional random number generator * @return a function which can be called to generate random values. */ def forType( dataType: DataType, nullable: Boolean = true, - seed: Option[Long] = None): Option[() => Any] = { - val rand = new Random() - seed.foreach(rand.setSeed) - + rand: Random = new Random): Option[() => Any] = { val valueGenerator: Option[() => Any] = dataType match { case StringType => Some(() => rand.nextString(rand.nextInt(MAX_STR_LEN))) case BinaryType => Some(() => { @@ -165,15 +197,15 @@ object RandomDataGenerator { rand, _.nextInt().toShort, Seq(Short.MinValue, Short.MaxValue, 0.toShort)) case NullType => Some(() => null) case ArrayType(elementType, containsNull) => { - forType(elementType, nullable = containsNull, seed = Some(rand.nextLong())).map { + forType(elementType, nullable = containsNull, rand).map { elementGenerator => () => Seq.fill(rand.nextInt(MAX_ARR_SIZE))(elementGenerator()) } } case MapType(keyType, valueType, valueContainsNull) => { for ( - keyGenerator <- forType(keyType, nullable = false, seed = Some(rand.nextLong())); + keyGenerator <- forType(keyType, nullable = false, rand); valueGenerator <- - forType(valueType, nullable = valueContainsNull, seed = Some(rand.nextLong())) + forType(valueType, nullable = valueContainsNull, rand) ) yield { () => { Seq.fill(rand.nextInt(MAX_MAP_SIZE))((keyGenerator(), valueGenerator())).toMap @@ -182,7 +214,7 @@ object RandomDataGenerator { } case StructType(fields) => { val maybeFieldGenerators: Seq[Option[() => Any]] = fields.map { field => - forType(field.dataType, nullable = field.nullable, seed = Some(rand.nextLong())) + forType(field.dataType, nullable = field.nullable, rand) } if (maybeFieldGenerators.forall(_.isDefined)) { val fieldGenerators: Seq[() => Any] = maybeFieldGenerators.map(_.get) @@ -192,7 +224,7 @@ object RandomDataGenerator { } } case udt: UserDefinedType[_] => { - val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed) + val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, rand) // Because random data generator at here returns scala value, we need to // convert it to catalyst value to call udt's deserialize. val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType) @@ -229,4 +261,40 @@ object RandomDataGenerator { } } } + + // Generates a random row for `schema`. + def randomRow(rand: Random, schema: StructType): Row = { + val fields = mutable.ArrayBuffer.empty[Any] + schema.fields.foreach { f => + f.dataType match { + case ArrayType(childType, nullable) => { + val data = if (f.nullable && rand.nextFloat() <= PROBABILITY_OF_NULL) { + null + } else { + val arr = mutable.ArrayBuffer.empty[Any] + val n = 1// rand.nextInt(10) + var i = 0 + val generator = RandomDataGenerator.forType(childType, nullable, rand) + assert(generator.isDefined, "Unsupported type") + val gen = generator.get + while (i < n) { + arr += gen() + i += 1 + } + arr + } + fields += data + } + case StructType(children) => { + fields += randomRow(rand, StructType(children)) + } + case _ => + val generator = RandomDataGenerator.forType(f.dataType, f.nullable, rand) + assert(generator.isDefined, "Unsupported type") + val gen = generator.get + fields += gen() + } + } + Row.fromSeq(fields) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala index cccac7efa0..b8ccdf7516 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.types._ @@ -32,7 +34,7 @@ class RandomDataGeneratorSuite extends SparkFunSuite { */ def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = { val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType) - val generator = RandomDataGenerator.forType(dataType, nullable, Some(33)).getOrElse { + val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse { fail(s"Random data generator was not defined for $dataType") } if (nullable) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala index 59729e7646..9f19745cef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala @@ -74,8 +74,9 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite { private def testConcatOnce(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]) { info(s"schema size $numFields1, $numFields2") - val schema1 = RandomDataGenerator.randomSchema(numFields1, candidateTypes) - val schema2 = RandomDataGenerator.randomSchema(numFields2, candidateTypes) + val random = new Random() + val schema1 = RandomDataGenerator.randomSchema(random, numFields1, candidateTypes) + val schema2 = RandomDataGenerator.randomSchema(random, numFields2, candidateTypes) // Create the converters needed to convert from external row to internal row and to UnsafeRows. val internalConverter1 = CatalystTypeConverters.createToCatalystConverter(schema1) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 85509751db..c119758d68 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -17,22 +17,45 @@ package org.apache.spark.sql.execution.vectorized; import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; +import org.apache.spark.sql.catalyst.util.MapData; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.types.CalendarInterval; +import org.apache.spark.unsafe.types.UTF8String; + +import org.apache.commons.lang.NotImplementedException; /** * This class represents a column of values and provides the main APIs to access the data * values. It supports all the types and contains get/put APIs as well as their batched versions. * The batched versions are preferable whenever possible. * - * Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values + * To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these + * columns have child columns. All of the data is stored in the child columns and the parent column + * contains nullability, and in the case of Arrays, the lengths and offsets into the child column. + * Lengths and offsets are encoded identically to INTs. + * Maps are just a special case of a two field struct. + * Strings are handled as an Array of ByteType. + * + * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the + * responsibility of the caller to call reserve() to ensure there is enough room before adding + * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas), + * the lengths are known up front. + * + * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values * in the current RowBatch. * * A ColumnVector should be considered immutable once originally created. In other words, it is not * valid to call put APIs after reads until reset() is called. + * + * ColumnVectors are intended to be reused. */ public abstract class ColumnVector { /** - * Allocates a column with each element of size `width` either on or off heap. + * Allocates a column to store elements of `type` on or off heap. + * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is + * in number of elements, not number of bytes. */ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) { if (mode == MemoryMode.OFF_HEAP) { @@ -42,13 +65,265 @@ public abstract class ColumnVector { } } + /** + * Holder object to return an array. This object is intended to be reused. Callers should + * copy the data out if it needs to be stored. + */ + public static final class Array extends ArrayData { + // The data for this array. This array contains elements from + // data[offset] to data[offset + length). + public final ColumnVector data; + public int length; + public int offset; + + // Populate if binary data is required for the Array. This is stored here as an optimization + // for string data. + public byte[] byteArray; + public int byteArrayOffset; + + // Reused staging buffer, used for loading from offheap. + protected byte[] tmpByteArray = new byte[1]; + + protected Array(ColumnVector data) { + this.data = data; + } + + @Override + public final int numElements() { return length; } + + @Override + public ArrayData copy() { + throw new NotImplementedException(); + } + + // TODO: this is extremely expensive. + @Override + public Object[] array() { + DataType dt = data.dataType(); + Object[] list = new Object[length]; + + if (dt instanceof ByteType) { + for (int i = 0; i < length; i++) { + if (!data.getIsNull(offset + i)) { + list[i] = data.getByte(offset + i); + } + } + } else if (dt instanceof IntegerType) { + for (int i = 0; i < length; i++) { + if (!data.getIsNull(offset + i)) { + list[i] = data.getInt(offset + i); + } + } + } else if (dt instanceof DoubleType) { + for (int i = 0; i < length; i++) { + if (!data.getIsNull(offset + i)) { + list[i] = data.getDouble(offset + i); + } + } + } else if (dt instanceof LongType) { + for (int i = 0; i < length; i++) { + if (!data.getIsNull(offset + i)) { + list[i] = data.getLong(offset + i); + } + } + } else if (dt instanceof StringType) { + for (int i = 0; i < length; i++) { + if (!data.getIsNull(offset + i)) { + list[i] = ColumnVectorUtils.toString(data.getByteArray(offset + i)); + } + } + } else { + throw new NotImplementedException("Type " + dt); + } + return list; + } + + @Override + public final boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); } + + @Override + public final boolean getBoolean(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public byte getByte(int ordinal) { return data.getByte(offset + ordinal); } + + @Override + public short getShort(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public int getInt(int ordinal) { return data.getInt(offset + ordinal); } + + @Override + public long getLong(int ordinal) { return data.getLong(offset + ordinal); } + + @Override + public float getFloat(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public double getDouble(int ordinal) { return data.getDouble(offset + ordinal); } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + throw new NotImplementedException(); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + Array child = data.getByteArray(offset + ordinal); + return UTF8String.fromBytes(child.byteArray, child.byteArrayOffset, child.length); + } + + @Override + public byte[] getBinary(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public InternalRow getStruct(int ordinal, int numFields) { + throw new NotImplementedException(); + } + + @Override + public ArrayData getArray(int ordinal) { + return data.getArray(offset + ordinal); + } + + @Override + public MapData getMap(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public Object get(int ordinal, DataType dataType) { + throw new NotImplementedException(); + } + } + + /** + * Holder object to return a struct. This object is intended to be reused. + */ + public static final class Struct extends InternalRow { + // The fields that make up this struct. For example, if the struct had 2 int fields, the access + // to it would be: + // int f1 = fields[0].getInt[rowId] + // int f2 = fields[1].getInt[rowId] + public final ColumnVector[] fields; + + @Override + public boolean isNullAt(int fieldIdx) { return fields[fieldIdx].getIsNull(rowId); } + + @Override + public boolean getBoolean(int ordinal) { + throw new NotImplementedException(); + } + + public byte getByte(int fieldIdx) { return fields[fieldIdx].getByte(rowId); } + + @Override + public short getShort(int ordinal) { + throw new NotImplementedException(); + } + + public int getInt(int fieldIdx) { return fields[fieldIdx].getInt(rowId); } + public long getLong(int fieldIdx) { return fields[fieldIdx].getLong(rowId); } + + @Override + public float getFloat(int ordinal) { + throw new NotImplementedException(); + } + + public double getDouble(int fieldIdx) { return fields[fieldIdx].getDouble(rowId); } + + @Override + public Decimal getDecimal(int ordinal, int precision, int scale) { + throw new NotImplementedException(); + } + + @Override + public UTF8String getUTF8String(int ordinal) { + Array a = getByteArray(ordinal); + return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); + } + + @Override + public byte[] getBinary(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public CalendarInterval getInterval(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public InternalRow getStruct(int ordinal, int numFields) { + return fields[ordinal].getStruct(rowId); + } + + public Array getArray(int fieldIdx) { return fields[fieldIdx].getArray(rowId); } + + @Override + public MapData getMap(int ordinal) { + throw new NotImplementedException(); + } + + @Override + public Object get(int ordinal, DataType dataType) { + throw new NotImplementedException(); + } + + public Array getByteArray(int fieldIdx) { return fields[fieldIdx].getByteArray(rowId); } + public Struct getStruct(int fieldIdx) { return fields[fieldIdx].getStruct(rowId); } + + @Override + public final int numFields() { + return fields.length; + } + + @Override + public InternalRow copy() { + throw new NotImplementedException(); + } + + @Override + public boolean anyNull() { + throw new NotImplementedException(); + } + + protected int rowId; + + protected Struct(ColumnVector[] fields) { + this.fields = fields; + } + } + + /** + * Returns the data type of this column. + */ public final DataType dataType() { return type; } /** * Resets this column for writing. The currently stored values are no longer accessible. */ public void reset() { + if (childColumns != null) { + for (ColumnVector c: childColumns) { + c.reset(); + } + } numNulls = 0; + elementsAppended = 0; if (anyNullsSet) { putNotNulls(0, capacity); anyNullsSet = false; @@ -61,6 +336,12 @@ public abstract class ColumnVector { */ public abstract void close(); + /* + * Ensures that there is enough storage to store capcity elements. That is, the put() APIs + * must work for all rowIds < capcity. + */ + public abstract void reserve(int capacity); + /** * Returns the number of nulls in this column. */ @@ -99,6 +380,26 @@ public abstract class ColumnVector { /** * Sets the value at rowId to `value`. */ + public abstract void putByte(int rowId, byte value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putBytes(int rowId, int count, byte value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex); + + /** + * Returns the value for rowId. + */ + public abstract byte getByte(int rowId); + + /** + * Sets the value at rowId to `value`. + */ public abstract void putInt(int rowId, int value); /** @@ -118,13 +419,39 @@ public abstract class ColumnVector { public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex); /** - * Returns the integer for rowId. + * Returns the value for rowId. */ public abstract int getInt(int rowId); /** * Sets the value at rowId to `value`. */ + public abstract void putLong(int rowId, long value); + + /** + * Sets values from [rowId, rowId + count) to value. + */ + public abstract void putLongs(int rowId, int count, long value); + + /** + * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count) + */ + public abstract void putLongs(int rowId, int count, long[] src, int srcIndex); + + /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) + * The data in src must be 8-byte little endian longs. + */ + public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex); + + /** + * Returns the value for rowId. + */ + public abstract long getLong(int rowId); + + /** + * Sets the value at rowId to `value`. + */ public abstract void putDouble(int rowId, double value); /** @@ -145,14 +472,248 @@ public abstract class ColumnVector { public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex); /** - * Returns the double for rowId. + * Returns the value for rowId. */ public abstract double getDouble(int rowId); /** + * Puts a byte array that already exists in this column. + */ + public abstract void putArray(int rowId, int offset, int length); + + /** + * Returns the length of the array at rowid. + */ + public abstract int getArrayLength(int rowId); + + /** + * Returns the offset of the array at rowid. + */ + public abstract int getArrayOffset(int rowId); + + /** + * Returns a utility object to get structs. + */ + public Struct getStruct(int rowId) { + resultStruct.rowId = rowId; + return resultStruct; + } + + /** + * Returns the array at rowid. + */ + public final Array getArray(int rowId) { + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + return resultArray; + } + + /** + * Loads the data into array.byteArray. + */ + public abstract void loadBytes(Array array); + + /** + * Sets the value at rowId to `value`. + */ + public abstract int putByteArray(int rowId, byte[] value, int offset, int count); + public final int putByteArray(int rowId, byte[] value) { + return putByteArray(rowId, value, 0, value.length); + } + + /** + * Returns the value for rowId. + */ + public final Array getByteArray(int rowId) { + Array array = getArray(rowId); + array.data.loadBytes(array); + return array; + } + + /** + * Append APIs. These APIs all behave similarly and will append data to the current vector. It + * is not valid to mix the put and append APIs. The append APIs are slower and should only be + * used if the sizes are not known up front. + * In all these cases, the return value is the rowId for the first appended element. + */ + public final int appendNull() { + assert (!(dataType() instanceof StructType)); // Use appendStruct() + reserve(elementsAppended + 1); + putNull(elementsAppended); + return elementsAppended++; + } + + public final int appendNotNull() { + reserve(elementsAppended + 1); + putNotNull(elementsAppended); + return elementsAppended++; + } + + public final int appendNulls(int count) { + assert (!(dataType() instanceof StructType)); + reserve(elementsAppended + count); + int result = elementsAppended; + putNulls(elementsAppended, count); + elementsAppended += count; + return result; + } + + public final int appendNotNulls(int count) { + assert (!(dataType() instanceof StructType)); + reserve(elementsAppended + count); + int result = elementsAppended; + putNotNulls(elementsAppended, count); + elementsAppended += count; + return result; + } + + public final int appendByte(byte v) { + reserve(elementsAppended + 1); + putByte(elementsAppended, v); + return elementsAppended++; + } + + public final int appendBytes(int count, byte v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putBytes(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendBytes(int length, byte[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putBytes(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendInt(int v) { + reserve(elementsAppended + 1); + putInt(elementsAppended, v); + return elementsAppended++; + } + + public final int appendInts(int count, int v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putInts(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendInts(int length, int[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putInts(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendLong(long v) { + reserve(elementsAppended + 1); + putLong(elementsAppended, v); + return elementsAppended++; + } + + public final int appendLongs(int count, long v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putLongs(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendLongs(int length, long[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putLongs(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendDouble(double v) { + reserve(elementsAppended + 1); + putDouble(elementsAppended, v); + return elementsAppended++; + } + + public final int appendDoubles(int count, double v) { + reserve(elementsAppended + count); + int result = elementsAppended; + putDoubles(elementsAppended, count, v); + elementsAppended += count; + return result; + } + + public final int appendDoubles(int length, double[] src, int offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putDoubles(elementsAppended, length, src, offset); + elementsAppended += length; + return result; + } + + public final int appendByteArray(byte[] value, int offset, int length) { + int copiedOffset = arrayData().appendBytes(length, value, offset); + reserve(elementsAppended + 1); + putArray(elementsAppended, copiedOffset, length); + return elementsAppended++; + } + + public final int appendArray(int length) { + reserve(elementsAppended + 1); + putArray(elementsAppended, arrayData().elementsAppended, length); + return elementsAppended++; + } + + /** + * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this + * recursively appends a NULL to its children. + * We don't have this logic as the general appendNull implementation to optimize the more + * common non-struct case. + */ + public final int appendStruct(boolean isNull) { + if (isNull) { + appendNull(); + for (ColumnVector c: childColumns) { + if (c.type instanceof StructType) { + c.appendStruct(true); + } else { + c.appendNull(); + } + } + } else { + appendNotNull(); + } + return elementsAppended; + } + + /** + * Returns the data for the underlying array. + */ + public final ColumnVector arrayData() { return childColumns[0]; } + + /** + * Returns the ordinal's child data column. + */ + public final ColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; } + + /** + * Returns the elements appended. + */ + public int getElementsAppended() { return elementsAppended; } + + /** * Maximum number of rows that can be stored in this column. */ - protected final int capacity; + protected int capacity; + + /** + * Data type for this column. + */ + protected final DataType type; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. @@ -166,12 +727,63 @@ public abstract class ColumnVector { protected boolean anyNullsSet; /** - * Data type for this column. + * Default size of each array length value. This grows as necessary. */ - protected final DataType type; + protected static final int DEFAULT_ARRAY_LENGTH = 4; + + /** + * Current write cursor (row index) when appending data. + */ + protected int elementsAppended; - protected ColumnVector(int capacity, DataType type) { + /** + * If this is a nested type (array or struct), the column for the child data. + */ + protected final ColumnVector[] childColumns; + + /** + * Reusable Array holder for getArray(). + */ + protected final Array resultArray; + + /** + * Reusable Struct holder for getStruct(). + */ + protected final Struct resultStruct; + + /** + * Sets up the common state and also handles creating the child columns if this is a nested + * type. + */ + protected ColumnVector(int capacity, DataType type, MemoryMode memMode) { this.capacity = capacity; this.type = type; + + if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType) { + DataType childType; + int childCapacity = capacity; + if (type instanceof ArrayType) { + childType = ((ArrayType)type).elementType(); + } else { + childType = DataTypes.ByteType; + childCapacity *= DEFAULT_ARRAY_LENGTH; + } + this.childColumns = new ColumnVector[1]; + this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode); + this.resultArray = new Array(this.childColumns[0]); + this.resultStruct = null; + } else if (type instanceof StructType) { + StructType st = (StructType)type; + this.childColumns = new ColumnVector[st.fields().length]; + for (int i = 0; i < childColumns.length; ++i) { + this.childColumns[i] = ColumnVector.allocate(capacity, st.fields()[i].dataType(), memMode); + } + this.resultArray = null; + this.resultStruct = new Struct(this.childColumns); + } else { + this.childColumns = null; + this.resultArray = null; + this.resultStruct = null; + } } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java new file mode 100644 index 0000000000..6c651a759d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.util.Iterator; +import java.util.List; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.*; + +import org.apache.commons.lang.NotImplementedException; + +/** + * Utilities to help manipulate data associate with ColumnVectors. These should be used mostly + * for debugging or other non-performance critical paths. + * These utilities are mostly used to convert ColumnVectors into other formats. + */ +public class ColumnVectorUtils { + public static String toString(ColumnVector.Array a) { + return new String(a.byteArray, a.byteArrayOffset, a.length); + } + + /** + * Returns the array data as the java primitive array. + * For example, an array of IntegerType will return an int[]. + * Throws exceptions for unhandled schemas. + */ + public static Object toPrimitiveJavaArray(ColumnVector.Array array) { + DataType dt = array.data.dataType(); + if (dt instanceof IntegerType) { + int[] result = new int[array.length]; + ColumnVector data = array.data; + for (int i = 0; i < result.length; i++) { + if (data.getIsNull(array.offset + i)) { + throw new RuntimeException("Cannot handle NULL values."); + } + result[i] = data.getInt(array.offset + i); + } + return result; + } else { + throw new NotImplementedException(); + } + } + + private static void appendValue(ColumnVector dst, DataType t, Object o) { + if (o == null) { + dst.appendNull(); + } else { + if (t == DataTypes.ByteType) { + dst.appendByte(((Byte)o).byteValue()); + } else if (t == DataTypes.IntegerType) { + dst.appendInt(((Integer)o).intValue()); + } else if (t == DataTypes.LongType) { + dst.appendLong(((Long)o).longValue()); + } else if (t == DataTypes.DoubleType) { + dst.appendDouble(((Double)o).doubleValue()); + } else if (t == DataTypes.StringType) { + byte[] b =((String)o).getBytes(); + dst.appendByteArray(b, 0, b.length); + } else { + throw new NotImplementedException("Type " + t); + } + } + } + + private static void appendValue(ColumnVector dst, DataType t, Row src, int fieldIdx) { + if (t instanceof ArrayType) { + ArrayType at = (ArrayType)t; + if (src.isNullAt(fieldIdx)) { + dst.appendNull(); + } else { + List<Object> values = src.getList(fieldIdx); + dst.appendArray(values.size()); + for (Object o : values) { + appendValue(dst.arrayData(), at.elementType(), o); + } + } + } else if (t instanceof StructType) { + StructType st = (StructType)t; + if (src.isNullAt(fieldIdx)) { + dst.appendStruct(true); + } else { + dst.appendStruct(false); + Row c = src.getStruct(fieldIdx); + for (int i = 0; i < st.fields().length; i++) { + appendValue(dst.getChildColumn(i), st.fields()[i].dataType(), c, i); + } + } + } else { + appendValue(dst, t, src.get(fieldIdx)); + } + } + + /** + * Converts an iterator of rows into a single ColumnBatch. + */ + public static ColumnarBatch toBatch( + StructType schema, MemoryMode memMode, Iterator<Row> row) { + ColumnarBatch batch = ColumnarBatch.allocate(schema, memMode); + int n = 0; + while (row.hasNext()) { + Row r = row.next(); + for (int i = 0; i < schema.fields().length; i++) { + appendValue(batch.column(i), schema.fields()[i].dataType(), r, i); + } + n++; + } + batch.setNumRows(n); + return batch; + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index 2c55f854c2..d558dae50c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -21,12 +21,10 @@ import java.util.Iterator; import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.Decimal; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -48,6 +46,7 @@ import org.apache.commons.lang.NotImplementedException; */ public final class ColumnarBatch { private static final int DEFAULT_BATCH_SIZE = 4 * 1024; + private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; private final StructType schema; private final int capacity; @@ -64,6 +63,10 @@ public final class ColumnarBatch { return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); } + public static ColumnarBatch allocate(StructType type) { + return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); + } + public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { return new ColumnarBatch(schema, maxRows, memMode); } @@ -82,25 +85,53 @@ public final class ColumnarBatch { * Adapter class to interop with existing components that expect internal row. A lot of * performance is lost with this translation. */ - public final class Row extends InternalRow { + public static final class Row extends InternalRow { private int rowId; + private final ColumnarBatch parent; + private final int fixedLenRowSize; + + private Row(ColumnarBatch parent) { + this.parent = parent; + this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols()); + } /** * Marks this row as being filtered out. This means a subsequent iteration over the rows * in this batch will not include this row. */ public final void markFiltered() { - ColumnarBatch.this.markFiltered(rowId); + parent.markFiltered(rowId); } @Override public final int numFields() { - return ColumnarBatch.this.numCols(); + return parent.numCols(); } @Override + /** + * Revisit this. This is expensive. + */ public final InternalRow copy() { - throw new NotImplementedException(); + UnsafeRow row = new UnsafeRow(parent.numCols()); + row.pointTo(new byte[fixedLenRowSize], fixedLenRowSize); + for (int i = 0; i < parent.numCols(); i++) { + if (isNullAt(i)) { + row.setNullAt(i); + } else { + DataType dt = parent.schema.fields()[i].dataType(); + if (dt instanceof IntegerType) { + row.setInt(i, getInt(i)); + } else if (dt instanceof LongType) { + row.setLong(i, getLong(i)); + } else if (dt instanceof DoubleType) { + row.setDouble(i, getDouble(i)); + } else { + throw new RuntimeException("Not implemented."); + } + } + } + return row; } @Override @@ -110,7 +141,7 @@ public final class ColumnarBatch { @Override public final boolean isNullAt(int ordinal) { - return ColumnarBatch.this.column(ordinal).getIsNull(rowId); + return parent.column(ordinal).getIsNull(rowId); } @Override @@ -119,9 +150,7 @@ public final class ColumnarBatch { } @Override - public final byte getByte(int ordinal) { - throw new NotImplementedException(); - } + public final byte getByte(int ordinal) { return parent.column(ordinal).getByte(rowId); } @Override public final short getShort(int ordinal) { @@ -130,13 +159,11 @@ public final class ColumnarBatch { @Override public final int getInt(int ordinal) { - return ColumnarBatch.this.column(ordinal).getInt(rowId); + return parent.column(ordinal).getInt(rowId); } @Override - public final long getLong(int ordinal) { - throw new NotImplementedException(); - } + public final long getLong(int ordinal) { return parent.column(ordinal).getLong(rowId); } @Override public final float getFloat(int ordinal) { @@ -145,7 +172,7 @@ public final class ColumnarBatch { @Override public final double getDouble(int ordinal) { - return ColumnarBatch.this.column(ordinal).getDouble(rowId); + return parent.column(ordinal).getDouble(rowId); } @Override @@ -155,7 +182,8 @@ public final class ColumnarBatch { @Override public final UTF8String getUTF8String(int ordinal) { - throw new NotImplementedException(); + ColumnVector.Array a = parent.column(ordinal).getByteArray(rowId); + return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); } @Override @@ -170,12 +198,12 @@ public final class ColumnarBatch { @Override public final InternalRow getStruct(int ordinal, int numFields) { - throw new NotImplementedException(); + return parent.column(ordinal).getStruct(rowId); } @Override public final ArrayData getArray(int ordinal) { - throw new NotImplementedException(); + return parent.column(ordinal).getArray(rowId); } @Override @@ -194,7 +222,7 @@ public final class ColumnarBatch { */ public Iterator<Row> rowIterator() { final int maxRows = ColumnarBatch.this.numRows(); - final Row row = new Row(); + final Row row = new Row(this); return new Iterator<Row>() { int rowId = 0; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6180dd308e..335124fd5a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -18,14 +18,20 @@ package org.apache.spark.sql.execution.vectorized; import java.nio.ByteOrder; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.ByteType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DoubleType; import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.sql.types.LongType; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; import org.apache.commons.lang.NotImplementedException; +import org.apache.commons.lang.NotImplementedException; + /** * Column data backed using offheap memory. */ @@ -35,21 +41,21 @@ public final class OffHeapColumnVector extends ColumnVector { private long nulls; private long data; + // Set iff the type is array. + private long lengthData; + private long offsetData; + protected OffHeapColumnVector(int capacity, DataType type) { - super(capacity, type); + super(capacity, type, MemoryMode.OFF_HEAP); if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) { throw new NotImplementedException("Only little endian is supported."); } + nulls = 0; + data = 0; + lengthData = 0; + offsetData = 0; - this.nulls = Platform.allocateMemory(capacity); - if (type instanceof IntegerType) { - this.data = Platform.allocateMemory(capacity * 4); - } else if (type instanceof DoubleType) { - this.data = Platform.allocateMemory(capacity * 8); - } else { - throw new RuntimeException("Unhandled " + type); - } - anyNullsSet = true; + reserveInternal(capacity); reset(); } @@ -67,8 +73,12 @@ public final class OffHeapColumnVector extends ColumnVector { public final void close() { Platform.freeMemory(nulls); Platform.freeMemory(data); + Platform.freeMemory(lengthData); + Platform.freeMemory(offsetData); nulls = 0; data = 0; + lengthData = 0; + offsetData = 0; } // @@ -112,6 +122,33 @@ public final class OffHeapColumnVector extends ColumnVector { } // + // APIs dealing with Bytes + // + + @Override + public final void putByte(int rowId, byte value) { + Platform.putByte(null, data + rowId, value); + + } + + @Override + public final void putBytes(int rowId, int count, byte value) { + for (int i = 0; i < count; ++i) { + Platform.putByte(null, data + rowId + i, value); + } + } + + @Override + public final void putBytes(int rowId, int count, byte[] src, int srcIndex) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count); + } + + @Override + public final byte getByte(int rowId) { + return Platform.getByte(null, data + rowId); + } + + // // APIs dealing with ints // @@ -146,6 +183,40 @@ public final class OffHeapColumnVector extends ColumnVector { } // + // APIs dealing with Longs + // + + @Override + public final void putLong(int rowId, long value) { + Platform.putLong(null, data + 8 * rowId, value); + } + + @Override + public final void putLongs(int rowId, int count, long value) { + long offset = data + 8 * rowId; + for (int i = 0; i < count; ++i, offset += 8) { + Platform.putLong(null, offset, value); + } + } + + @Override + public final void putLongs(int rowId, int count, long[] src, int srcIndex) { + Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8, + null, data + 8 * rowId, count * 8); + } + + @Override + public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET, + null, data + 8 * rowId, count * 8); + } + + @Override + public final long getLong(int rowId) { + return Platform.getLong(null, data + 8 * rowId); + } + + // // APIs dealing with doubles // @@ -178,4 +249,70 @@ public final class OffHeapColumnVector extends ColumnVector { public final double getDouble(int rowId) { return Platform.getDouble(null, data + rowId * 8); } + + // + // APIs dealing with Arrays. + // + @Override + public final void putArray(int rowId, int offset, int length) { + assert(offset >= 0 && offset + length <= childColumns[0].capacity); + Platform.putInt(null, lengthData + 4 * rowId, length); + Platform.putInt(null, offsetData + 4 * rowId, offset); + } + + @Override + public final int getArrayLength(int rowId) { + return Platform.getInt(null, lengthData + 4 * rowId); + } + + @Override + public final int getArrayOffset(int rowId) { + return Platform.getInt(null, offsetData + 4 * rowId); + } + + // APIs dealing with ByteArrays + @Override + public final int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + Platform.putInt(null, lengthData + 4 * rowId, length); + Platform.putInt(null, offsetData + 4 * rowId, result); + return result; + } + + @Override + public final void loadBytes(Array array) { + if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; + Platform.copyMemory( + null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length); + array.byteArray = array.tmpByteArray; + array.byteArrayOffset = 0; + } + + @Override + public final void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); + } + + // Split out the slow path. + private final void reserveInternal(int newCapacity) { + if (this.resultArray != null) { + this.lengthData = + Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4); + this.offsetData = + Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4); + } else if (type instanceof ByteType) { + this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity); + } else if (type instanceof IntegerType) { + this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4); + } else if (type instanceof LongType || type instanceof DoubleType) { + this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8); + } else if (resultStruct != null) { + // Nothing to store. + } else { + throw new RuntimeException("Unhandled " + type); + } + this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity); + Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended); + capacity = newCapacity; + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 76d9956c38..8197fa11cd 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -16,13 +16,10 @@ */ package org.apache.spark.sql.execution.vectorized; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DoubleType; -import org.apache.spark.sql.types.IntegerType; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; -import java.nio.ByteBuffer; -import java.nio.DoubleBuffer; import java.util.Arrays; /** @@ -37,19 +34,18 @@ public final class OnHeapColumnVector extends ColumnVector { private byte[] nulls; // Array for each type. Only 1 is populated for any type. + private byte[] byteData; private int[] intData; + private long[] longData; private double[] doubleData; + // Only set if type is Array. + private int[] arrayLengths; + private int[] arrayOffsets; + protected OnHeapColumnVector(int capacity, DataType type) { - super(capacity, type); - if (type instanceof IntegerType) { - this.intData = new int[capacity]; - } else if (type instanceof DoubleType) { - this.doubleData = new double[capacity]; - } else { - throw new RuntimeException("Unhandled " + type); - } - this.nulls = new byte[capacity]; + super(capacity, type, MemoryMode.ON_HEAP); + reserveInternal(capacity); reset(); } @@ -109,6 +105,32 @@ public final class OnHeapColumnVector extends ColumnVector { } // + // APIs dealing with Bytes + // + + @Override + public final void putByte(int rowId, byte value) { + byteData[rowId] = value; + } + + @Override + public final void putBytes(int rowId, int count, byte value) { + for (int i = 0; i < count; ++i) { + byteData[i + rowId] = value; + } + } + + @Override + public final void putBytes(int rowId, int count, byte[] src, int srcIndex) { + System.arraycopy(src, srcIndex, byteData, rowId, count); + } + + @Override + public final byte getByte(int rowId) { + return byteData[rowId]; + } + + // // APIs dealing with Ints // @@ -145,6 +167,43 @@ public final class OnHeapColumnVector extends ColumnVector { } // + // APIs dealing with Longs + // + + @Override + public final void putLong(int rowId, long value) { + longData[rowId] = value; + } + + @Override + public final void putLongs(int rowId, int count, long value) { + for (int i = 0; i < count; ++i) { + longData[i + rowId] = value; + } + } + + @Override + public final void putLongs(int rowId, int count, long[] src, int srcIndex) { + System.arraycopy(src, srcIndex, longData, rowId, count); + } + + @Override + public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET; + for (int i = 0; i < count; ++i) { + longData[i + rowId] = Platform.getLong(src, srcOffset); + srcIndex += 8; + srcOffset += 8; + } + } + + @Override + public final long getLong(int rowId) { + return longData[rowId]; + } + + + // // APIs dealing with doubles // @@ -173,4 +232,86 @@ public final class OnHeapColumnVector extends ColumnVector { public final double getDouble(int rowId) { return doubleData[rowId]; } + + // + // APIs dealing with Arrays + // + + @Override + public final int getArrayLength(int rowId) { + return arrayLengths[rowId]; + } + @Override + public final int getArrayOffset(int rowId) { + return arrayOffsets[rowId]; + } + + @Override + public final void putArray(int rowId, int offset, int length) { + arrayOffsets[rowId] = offset; + arrayLengths[rowId] = length; + } + + @Override + public final void loadBytes(Array array) { + array.byteArray = byteData; + array.byteArrayOffset = array.offset; + } + + // + // APIs dealing with Byte Arrays + // + + @Override + public final int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + arrayOffsets[rowId] = result; + arrayLengths[rowId] = length; + return result; + } + + @Override + public final void reserve(int requiredCapacity) { + if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2); + } + + // Spilt this function out since it is the slow path. + private final void reserveInternal(int newCapacity) { + if (this.resultArray != null) { + int[] newLengths = new int[newCapacity]; + int[] newOffsets = new int[newCapacity]; + if (this.arrayLengths != null) { + System.arraycopy(this.arrayLengths, 0, newLengths, 0, elementsAppended); + System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, elementsAppended); + } + arrayLengths = newLengths; + arrayOffsets = newOffsets; + } else if (type instanceof ByteType) { + byte[] newData = new byte[newCapacity]; + if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended); + byteData = newData; + } else if (type instanceof IntegerType) { + int[] newData = new int[newCapacity]; + if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended); + intData = newData; + } else if (type instanceof LongType) { + long[] newData = new long[newCapacity]; + if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended); + longData = newData; + } else if (type instanceof DoubleType) { + double[] newData = new double[newCapacity]; + if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended); + doubleData = newData; + } else if (resultStruct != null) { + // Nothing to store. + } else { + throw new RuntimeException("Unhandled " + type); + } + + byte[] newNulls = new byte[newCapacity]; + if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, elementsAppended); + nulls = newNulls; + + capacity = newCapacity; + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 95c9550aeb..8a95359d9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -40,8 +40,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext { private val rand = new Random(42) for (i <- 0 until 6) { - val keySchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, keyTypes) - val valueSchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, valueTypes) + val keySchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, keyTypes) + val valueSchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, valueTypes) testKVSorter(keySchema, valueSchema, spill = i > 3) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala index bfe944d835..8efdf8adb0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer +import scala.util.Random + import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.vectorized.ColumnVector -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{BinaryType, IntegerType} import org.apache.spark.unsafe.Platform import org.apache.spark.util.Benchmark import org.apache.spark.util.collection.BitSet @@ -239,6 +241,26 @@ object ColumnarBatchBenchmark { Platform.freeMemory(buffer) } + // Adding values by appending, instead of putting. + val onHeapAppend = { i: Int => + val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP) + var sum = 0L + for (n <- 0L until iters) { + var i = 0 + while (i < count) { + col.appendInt(i) + i += 1 + } + i = 0 + while (i < count) { + sum += col.getInt(i) + i += 1 + } + col.reset() + } + col.close + } + /* Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz Int Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate @@ -253,6 +275,7 @@ object ColumnarBatchBenchmark { Column(off heap direct) 237.6 1379.12 1.05 X UnsafeRow (on heap) 414.6 790.35 0.60 X UnsafeRow (off heap) 487.2 672.58 0.51 X + Column On Heap Append 530.1 618.14 0.59 X */ val benchmark = new Benchmark("Int Read/Write", count * iters) benchmark.addCase("Java Array")(javaArray) @@ -265,6 +288,7 @@ object ColumnarBatchBenchmark { benchmark.addCase("Column(off heap direct)")(columnOffheapDirect) benchmark.addCase("UnsafeRow (on heap)")(unsafeRowOnheap) benchmark.addCase("UnsafeRow (off heap)")(unsafeRowOffheap) + benchmark.addCase("Column On Heap Append")(onHeapAppend) benchmark.run() } @@ -314,8 +338,60 @@ object ColumnarBatchBenchmark { benchmark.run() } + def stringAccess(iters: Long): Unit = { + val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + val random = new Random(0) + + def randomString(min: Int, max: Int): String = { + val len = random.nextInt(max - min) + min + val sb = new StringBuilder(len) + var i = 0 + while (i < len) { + sb.append(chars.charAt(random.nextInt(chars.length()))); + i += 1 + } + return sb.toString + } + + val minString = 3 + val maxString = 32 + val count = 4 * 1000 + + val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray + + def column(memoryMode: MemoryMode) = { i: Int => + val column = ColumnVector.allocate(count, BinaryType, memoryMode) + var sum = 0L + for (n <- 0L until iters) { + var i = 0 + while (i < count) { + column.putByteArray(i, data(i)) + i += 1 + } + i = 0 + while (i < count) { + sum += column.getByteArray(i).length + i += 1 + } + column.reset() + } + } + + /* + String Read/Write: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------------- + On Heap 457.0 35.85 1.00 X + Off Heap 1206.0 13.59 0.38 X + */ + val benchmark = new Benchmark("String Read/Write", count * iters) + benchmark.addCase("On Heap")(column(MemoryMode.ON_HEAP)) + benchmark.addCase("Off Heap")(column(MemoryMode.OFF_HEAP)) + benchmark.run + } + def main(args: Array[String]): Unit = { intAccess(1024 * 40) booleanAccess(1024 * 40) + stringAccess(1024 * 4) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index d5e517c7f5..215ca9ab6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -17,14 +17,15 @@ package org.apache.spark.sql.execution.vectorized +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode -import org.apache.spark.sql.Row +import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform class ColumnarBatchSuite extends SparkFunSuite { @@ -74,6 +75,45 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("Byte Apis") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val reference = mutable.ArrayBuffer.empty[Byte] + + val column = ColumnVector.allocate(1024, ByteType, memMode) + var idx = 0 + + val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray + column.putBytes(idx, 2, values, 0) + reference += 1 + reference += 2 + idx += 2 + + column.putBytes(idx, 3, values, 2) + reference += 3 + reference += 4 + reference += 5 + idx += 3 + + column.putByte(idx, 9) + reference += 9 + idx += 1 + + column.putBytes(idx, 3, 4) + reference += 4 + reference += 4 + reference += 4 + idx += 3 + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode) + if (memMode == MemoryMode.OFF_HEAP) { + val addr = column.valuesNativeAddress() + assert(v._1 == Platform.getByte(null, addr + v._2)) + } + } + }} + } + test("Int Apis") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val seed = System.currentTimeMillis() @@ -142,6 +182,76 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("Long Apis") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val seed = System.currentTimeMillis() + val random = new Random(seed) + val reference = mutable.ArrayBuffer.empty[Long] + + val column = ColumnVector.allocate(1024, LongType, memMode) + var idx = 0 + + val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray + column.putLongs(idx, 2, values, 0) + reference += 1 + reference += 2 + idx += 2 + + column.putLongs(idx, 3, values, 2) + reference += 3 + reference += 4 + reference += 5 + idx += 3 + + val littleEndian = new Array[Byte](16) + littleEndian(0) = 7 + littleEndian(1) = 1 + littleEndian(8) = 6 + littleEndian(10) = 1 + + column.putLongsLittleEndian(idx, 1, littleEndian, 8) + column.putLongsLittleEndian(idx + 1, 1, littleEndian, 0) + reference += 6 + (1 << 16) + reference += 7 + (1 << 8) + idx += 2 + + column.putLongsLittleEndian(idx, 2, littleEndian, 0) + reference += 7 + (1 << 8) + reference += 6 + (1 << 16) + idx += 2 + + while (idx < column.capacity) { + val single = random.nextBoolean() + if (single) { + val v = random.nextLong() + column.putLong(idx, v) + reference += v + idx += 1 + } else { + + val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx) + column.putLongs(idx, n, n + 1) + var i = 0 + while (i < n) { + reference += (n + 1) + i += 1 + } + idx += n + } + } + + + reference.zipWithIndex.foreach { v => + assert(v._1 == column.getLong(v._2), "idx=" + v._2 + + " Seed = " + seed + " MemMode=" + memMode) + if (memMode == MemoryMode.OFF_HEAP) { + val addr = column.valuesNativeAddress() + assert(v._1 == Platform.getLong(null, addr + 8 * v._2)) + } + } + }} + } + test("Double APIs") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val seed = System.currentTimeMillis() @@ -209,15 +319,150 @@ class ColumnarBatchSuite extends SparkFunSuite { }} } + test("String APIs") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val reference = mutable.ArrayBuffer.empty[String] + + val column = ColumnVector.allocate(6, BinaryType, memMode) + assert(column.arrayData().elementsAppended == 0) + var idx = 0 + + val values = ("Hello" :: "abc" :: Nil).toArray + column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length) + reference += values(0) + idx += 1 + assert(column.arrayData().elementsAppended == 5) + + column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length) + reference += values(1) + idx += 1 + assert(column.arrayData().elementsAppended == 8) + + // Just put llo + val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2) + reference += "llo" + idx += 1 + assert(column.arrayData().elementsAppended == 11) + + // Put the same "ll" at offset. This should not allocate more memory in the column. + column.putArray(idx, offset, 2) + reference += "ll" + idx += 1 + assert(column.arrayData().elementsAppended == 11) + + // Put a long string + val s = "abcdefghijklmnopqrstuvwxyz" + column.putByteArray(idx, (s + s).getBytes) + reference += (s + s) + idx += 1 + assert(column.arrayData().elementsAppended == 11 + (s + s).length) + + reference.zipWithIndex.foreach { v => + assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) + assert(v._1 == ColumnVectorUtils.toString(column.getByteArray(v._2)), + "MemoryMode" + memMode) + } + + column.reset() + assert(column.arrayData().elementsAppended == 0) + }} + } + + test("Int Array") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) + + // Fill the underlying data with all the arrays back to back. + val data = column.arrayData(); + var i = 0 + while (i < 6) { + data.putInt(i, i) + i += 1 + } + + // Populate it with arrays [0], [1, 2], [], [3, 4, 5] + column.putArray(0, 0, 1) + column.putArray(1, 1, 2) + column.putArray(2, 2, 0) + column.putArray(3, 3, 3) + + val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] + val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] + val a3 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(2)).asInstanceOf[Array[Int]] + val a4 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(3)).asInstanceOf[Array[Int]] + assert(a1 === Array(0)) + assert(a2 === Array(1, 2)) + assert(a3 === Array.empty[Int]) + assert(a4 === Array(3, 4, 5)) + + // Verify the ArrayData APIs + assert(column.getArray(0).length == 1) + assert(column.getArray(0).getInt(0) == 0) + + assert(column.getArray(1).length == 2) + assert(column.getArray(1).getInt(0) == 1) + assert(column.getArray(1).getInt(1) == 2) + + assert(column.getArray(2).length == 0) + + assert(column.getArray(3).length == 3) + assert(column.getArray(3).getInt(0) == 3) + assert(column.getArray(3).getInt(1) == 4) + assert(column.getArray(3).getInt(2) == 5) + + // Add a longer array which requires resizing + column.reset + val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12) + assert(data.capacity == 10) + data.reserve(array.length) + assert(data.capacity == array.length * 2) + data.putInts(0, array.length, array, 0) + column.putArray(0, 0, array.length) + assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] + === array) + }} + } + + test("Struct Column") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val schema = new StructType().add("int", IntegerType).add("double", DoubleType) + val column = ColumnVector.allocate(1024, schema, memMode) + + val c1 = column.getChildColumn(0) + val c2 = column.getChildColumn(1) + assert(c1.dataType() == IntegerType) + assert(c2.dataType() == DoubleType) + + c1.putInt(0, 123) + c2.putDouble(0, 3.45) + c1.putInt(1, 456) + c2.putDouble(1, 5.67) + + val s = column.getStruct(0) + assert(s.fields(0).getInt(0) == 123) + assert(s.fields(0).getInt(1) == 456) + assert(s.fields(1).getDouble(0) == 3.45) + assert(s.fields(1).getDouble(1) == 5.67) + + assert(s.getInt(0) == 123) + assert(s.getDouble(1) == 3.45) + + val s2 = column.getStruct(1) + assert(s2.getInt(0) == 456) + assert(s2.getDouble(1) == 5.67) + }} + } + test("ColumnarBatch basic") { (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { val schema = new StructType() .add("intCol", IntegerType) .add("doubleCol", DoubleType) .add("intCol2", IntegerType) + .add("string", BinaryType) val batch = ColumnarBatch.allocate(schema, memMode) - assert(batch.numCols() == 3) + assert(batch.numCols() == 4) assert(batch.numRows() == 0) assert(batch.numValidRows() == 0) assert(batch.capacity() > 0) @@ -227,10 +472,11 @@ class ColumnarBatchSuite extends SparkFunSuite { batch.column(0).putInt(0, 1) batch.column(1).putDouble(0, 1.1) batch.column(2).putNull(0) + batch.column(3).putByteArray(0, "Hello".getBytes) batch.setNumRows(1) // Verify the results of the row. - assert(batch.numCols() == 3) + assert(batch.numCols() == 4) assert(batch.numRows() == 1) assert(batch.numValidRows() == 1) assert(batch.rowIterator().hasNext == true) @@ -241,6 +487,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.column(1).getDouble(0) == 1.1) assert(batch.column(1).getIsNull(0) == false) assert(batch.column(2).getIsNull(0) == true) + assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello") // Verify the iterator works correctly. val it = batch.rowIterator() @@ -251,6 +498,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(row.getDouble(1) == 1.1) assert(row.isNullAt(1) == false) assert(row.isNullAt(2) == true) + assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello") assert(it.hasNext == false) assert(it.hasNext == false) @@ -260,24 +508,27 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(batch.numValidRows() == 0) assert(batch.rowIterator().hasNext == false) - // Reset and add 3 throws + // Reset and add 3 rows batch.reset() assert(batch.numRows() == 0) assert(batch.numValidRows() == 0) assert(batch.rowIterator().hasNext == false) - // Add rows [NULL, 2.2, 2], [3, NULL, 3], [4, 4.4, 4] + // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world] batch.column(0).putNull(0) batch.column(1).putDouble(0, 2.2) batch.column(2).putInt(0, 2) + batch.column(3).putByteArray(0, "abc".getBytes) batch.column(0).putInt(1, 3) batch.column(1).putNull(1) batch.column(2).putInt(1, 3) + batch.column(3).putByteArray(1, "".getBytes) batch.column(0).putInt(2, 4) batch.column(1).putDouble(2, 4.4) batch.column(2).putInt(2, 4) + batch.column(3).putByteArray(2, "world".getBytes) batch.setNumRows(3) def rowEquals(x: InternalRow, y: Row): Unit = { @@ -289,30 +540,152 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(x.isNullAt(2) == y.isNullAt(2)) if (!x.isNullAt(2)) assert(x.getInt(2) == y.getInt(2)) + + assert(x.isNullAt(3) == y.isNullAt(3)) + if (!x.isNullAt(3)) assert(x.getString(3) == y.getString(3)) } + // Verify assert(batch.numRows() == 3) assert(batch.numValidRows() == 3) val it2 = batch.rowIterator() - rowEquals(it2.next(), Row(null, 2.2, 2)) - rowEquals(it2.next(), Row(3, null, 3)) - rowEquals(it2.next(), Row(4, 4.4, 4)) + rowEquals(it2.next(), Row(null, 2.2, 2, "abc")) + rowEquals(it2.next(), Row(3, null, 3, "")) + rowEquals(it2.next(), Row(4, 4.4, 4, "world")) assert(!it.hasNext) // Filter out some rows and verify batch.markFiltered(1) assert(batch.numValidRows() == 2) val it3 = batch.rowIterator() - rowEquals(it3.next(), Row(null, 2.2, 2)) - rowEquals(it3.next(), Row(4, 4.4, 4)) + rowEquals(it3.next(), Row(null, 2.2, 2, "abc")) + rowEquals(it3.next(), Row(4, 4.4, 4, "world")) assert(!it.hasNext) batch.markFiltered(2) assert(batch.numValidRows() == 1) val it4 = batch.rowIterator() - rowEquals(it4.next(), Row(null, 2.2, 2)) + rowEquals(it4.next(), Row(null, 2.2, 2, "abc")) batch.close }} } + + + private def doubleEquals(d1: Double, d2: Double): Boolean = { + if (d1.isNaN && d2.isNaN) { + true + } else { + d1 == d2 + } + } + + private def compareStruct(fields: Seq[StructField], r1: InternalRow, r2: Row, seed: Long) { + fields.zipWithIndex.foreach { v => { + assert(r1.isNullAt(v._2) == r2.isNullAt(v._2), "Seed = " + seed) + if (!r1.isNullAt(v._2)) { + v._1.dataType match { + case ByteType => assert(r1.getByte(v._2) == r2.getByte(v._2), "Seed = " + seed) + case IntegerType => assert(r1.getInt(v._2) == r2.getInt(v._2), "Seed = " + seed) + case LongType => assert(r1.getLong(v._2) == r2.getLong(v._2), "Seed = " + seed) + case DoubleType => assert(doubleEquals(r1.getDouble(v._2), r2.getDouble(v._2)), + "Seed = " + seed) + case StringType => + assert(r1.getString(v._2) == r2.getString(v._2), "Seed = " + seed) + case ArrayType(childType, n) => + val a1 = r1.getArray(v._2).array + val a2 = r2.getList(v._2).toArray + assert(a1.length == a2.length, "Seed = " + seed) + childType match { + case DoubleType => { + var i = 0 + while (i < a1.length) { + assert(doubleEquals(a1(i).asInstanceOf[Double], a2(i).asInstanceOf[Double]), + "Seed = " + seed) + i += 1 + } + } + case _ => assert(a1 === a2, "Seed = " + seed) + } + case StructType(childFields) => + compareStruct(childFields, r1.getStruct(v._2, fields.length), r2.getStruct(v._2), seed) + case _ => + throw new NotImplementedError("Not implemented " + v._1.dataType) + } + } + }} + } + + test("Convert rows") { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val rows = Row(1, 2L, "a", 1.2, 'b'.toByte) :: Row(4, 5L, "cd", 2.3, 'a'.toByte) :: Nil + val schema = new StructType() + .add("i1", IntegerType) + .add("l2", LongType) + .add("string", StringType) + .add("d", DoubleType) + .add("b", ByteType) + + val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) + assert(batch.numRows() == 2) + assert(batch.numCols() == 5) + + val it = batch.rowIterator() + val referenceIt = rows.iterator + while (it.hasNext) { + compareStruct(schema, it.next(), referenceIt.next(), 0) + } + batch.close() + } + }} + + /** + * This test generates a random schema data, serializes it to column batches and verifies the + * results. + */ + def testRandomRows(flatSchema: Boolean, numFields: Int) { + // TODO: add remaining types. Figure out why StringType doesn't work on jenkins. + val types = Array(ByteType, IntegerType, LongType, DoubleType) + val seed = System.nanoTime() + val NUM_ROWS = 500 + val NUM_ITERS = 1000 + val random = new Random(seed) + var i = 0 + while (i < NUM_ITERS) { + val schema = if (flatSchema) { + RandomDataGenerator.randomSchema(random, numFields, types) + } else { + RandomDataGenerator.randomNestedSchema(random, numFields, types) + } + val rows = mutable.ArrayBuffer.empty[Row] + var j = 0 + while (j < NUM_ROWS) { + val row = RandomDataGenerator.randomRow(random, schema) + rows += row + j += 1 + } + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) + assert(batch.numRows() == NUM_ROWS) + + val it = batch.rowIterator() + val referenceIt = rows.iterator + var k = 0 + while (it.hasNext) { + compareStruct(schema, it.next(), referenceIt.next(), seed) + k += 1 + } + batch.close() + }} + i += 1 + } + } + + test("Random flat schema") { + testRandomRows(true, 10) + } + + test("Random nested schema") { + testRandomRows(false, 30) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index 76b36aa891..3e4cf3f79e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.UnsafeRow @@ -879,7 +880,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te RandomDataGenerator.forType( dataType = schemaForGenerator, nullable = true, - seed = Some(System.nanoTime())) + new Random(System.nanoTime())) val dataGenerator = maybeDataGenerator .getOrElse(fail(s"Failed to create data generator for schema $schemaForGenerator")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 3f9ecf6965..1a4b3ece72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -122,7 +123,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val dataGenerator = RandomDataGenerator.forType( dataType = dataType, nullable = true, - seed = Some(System.nanoTime()) + new Random(System.nanoTime()) ).getOrElse { fail(s"Failed to create data generator for schema $dataType") } diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 0d6b215fe5..b29bf6a464 100644 --- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -105,6 +105,17 @@ public final class Platform { _UNSAFE.freeMemory(address); } + public static long reallocateMemory(long address, long oldSize, long newSize) { + long newMemory = _UNSAFE.allocateMemory(newSize); + copyMemory(null, address, null, newMemory, oldSize); + freeMemory(address); + return newMemory; + } + + public static void setMemory(long address, byte value, long size) { + _UNSAFE.setMemory(address, size, value); + } + public static void copyMemory( Object src, long srcOffset, Object dst, long dstOffset, long length) { // Check if dstOffset is before or after srcOffset to determine if we should copy |