diff options
author | Cheng Lian <lian@databricks.com> | 2014-03-23 12:08:55 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-03-23 12:08:55 -0700 |
commit | 57a4379c031e5d5901ba580422207d6aa2f19749 (patch) | |
tree | bad7341671b0b9adeacc114432befe7f749d8f9d /sql | |
parent | abf6714e27cf07a13819b35a4ca50ff9bb28b65c (diff) | |
download | spark-57a4379c031e5d5901ba580422207d6aa2f19749.tar.gz spark-57a4379c031e5d5901ba580422207d6aa2f19749.tar.bz2 spark-57a4379c031e5d5901ba580422207d6aa2f19749.zip |
[SPARK-1292] In-memory columnar representation for Spark SQL
This PR is rebased from the Catalyst repository, and contains the first version of in-memory columnar representation for Spark SQL. Compression support is not included yet and will be added later in a separate PR.
Author: Cheng Lian <lian@databricks.com>
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #205 from liancheng/memColumnarSupport and squashes the following commits:
99dba41 [Cheng Lian] Restricted new objects/classes to `private[sql]'
0892ad8 [Cheng Lian] Addressed ScalaStyle issues
af1ad5e [Cheng Lian] Fixed some minor issues introduced during rebasing
0dbf2fb [Cheng Lian] Make necessary renaming due to rebase
a162d4d [Cheng Lian] Removed the unnecessary InMemoryColumnarRelation class
9bcae4b [Cheng Lian] Added Apache license
220ee1e [Cheng Lian] Added table scan operator for in-memory columnar support.
c701c7a [Cheng Lian] Using SparkSqlSerializer for generic object SerDe causes error, made a workaround
ed8608e [Cheng Lian] Added implicit conversion from DataType to ColumnType
b8a645a [Cheng Lian] Replaced KryoSerializer with an updated SparkSqlSerializer
b6c0a49 [Cheng Lian] Minor test suite refactoring
214be73 [Cheng Lian] Refactored BINARY and GENERIC to reduce duplicate code
da2f4d5 [Cheng Lian] Added Apache license
dbf7a38 [Cheng Lian] Added ColumnAccessor and test suite, refactored ColumnBuilder
c01a177 [Cheng Lian] Added column builder classes and test suite
f18ddc6 [Cheng Lian] Added ColumnTypes and test suite
2d09066 [Cheng Lian] Added KryoSerializer
34f3c19 [Cheng Lian] Added TypeTag field to all NativeTypes
acc5c48 [Cheng Lian] Added Hive test files to .gitignore
Diffstat (limited to 'sql')
15 files changed, 1315 insertions, 35 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala index 6eb2b62ecc..90a9f9f7e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql package catalyst package types -import expressions.Expression +import scala.reflect.runtime.universe.{typeTag, TypeTag} + +import org.apache.spark.sql.catalyst.expressions.Expression abstract class DataType { /** Matches any expression that evaluates to this DataType */ @@ -33,11 +35,13 @@ case object NullType extends DataType abstract class NativeType extends DataType { type JvmType + @transient val tag: TypeTag[JvmType] val ordering: Ordering[JvmType] } case object StringType extends NativeType { type JvmType = String + @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } case object BinaryType extends DataType { @@ -45,6 +49,7 @@ case object BinaryType extends DataType { } case object BooleanType extends NativeType { type JvmType = Boolean + @transient lazy val tag = typeTag[JvmType] val ordering = implicitly[Ordering[JvmType]] } @@ -71,6 +76,7 @@ abstract class IntegralType extends NumericType { case object LongType extends IntegralType { type JvmType = Long + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Long]] val integral = implicitly[Integral[Long]] val ordering = implicitly[Ordering[JvmType]] @@ -78,6 +84,7 @@ case object LongType extends IntegralType { case object IntegerType extends IntegralType { type JvmType = Int + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Int]] val integral = implicitly[Integral[Int]] val ordering = implicitly[Ordering[JvmType]] @@ -85,6 +92,7 @@ case object IntegerType extends IntegralType { case object ShortType extends IntegralType { type JvmType = Short + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Short]] val integral = implicitly[Integral[Short]] val ordering = implicitly[Ordering[JvmType]] @@ -92,6 +100,7 @@ case object ShortType extends IntegralType { case object ByteType extends IntegralType { type JvmType = Byte + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Byte]] val integral = implicitly[Integral[Byte]] val ordering = implicitly[Ordering[JvmType]] @@ -110,6 +119,7 @@ abstract class FractionalType extends NumericType { case object DecimalType extends FractionalType { type JvmType = BigDecimal + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[BigDecimal]] val fractional = implicitly[Fractional[BigDecimal]] val ordering = implicitly[Ordering[JvmType]] @@ -117,6 +127,7 @@ case object DecimalType extends FractionalType { case object DoubleType extends FractionalType { type JvmType = Double + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Double]] val fractional = implicitly[Fractional[Double]] val ordering = implicitly[Ordering[JvmType]] @@ -124,6 +135,7 @@ case object DoubleType extends FractionalType { case object FloatType extends FractionalType { type JvmType = Float + @transient lazy val tag = typeTag[JvmType] val numeric = implicitly[Numeric[Float]] val fractional = implicitly[Fractional[Float]] val ordering = implicitly[Ordering[JvmType]] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala new file mode 100644 index 0000000000..ddbeba6203 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType} +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.SparkSqlSerializer + +/** + * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is + * extracted from the buffer, instead of directly returning it, the value is set into some field of + * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods + * for primitive values provided by [[MutableRow]]. + */ +private[sql] trait ColumnAccessor { + initialize() + + protected def initialize() + + def hasNext: Boolean + + def extractTo(row: MutableRow, ordinal: Int) + + protected def underlyingBuffer: ByteBuffer +} + +private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer) + extends ColumnAccessor { + + protected def initialize() {} + + def columnType: ColumnType[T, JvmType] + + def hasNext = buffer.hasRemaining + + def extractTo(row: MutableRow, ordinal: Int) { + doExtractTo(row, ordinal) + } + + protected def doExtractTo(row: MutableRow, ordinal: Int) + + protected def underlyingBuffer = buffer +} + +private[sql] abstract class NativeColumnAccessor[T <: NativeType]( + buffer: ByteBuffer, + val columnType: NativeColumnType[T]) + extends BasicColumnAccessor[T, T#JvmType](buffer) + with NullableColumnAccessor + +private[sql] class BooleanColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BOOLEAN) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setBoolean(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class IntColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, INT) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setInt(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class ShortColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, SHORT) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setShort(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class LongColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, LONG) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setLong(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class ByteColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BYTE) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setByte(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class DoubleColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, DOUBLE) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setDouble(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class FloatColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, FLOAT) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setFloat(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class StringColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, STRING) { + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row.setString(ordinal, columnType.extract(buffer)) + } +} + +private[sql] class BinaryColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer) + with NullableColumnAccessor { + + def columnType = BINARY + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + row(ordinal) = columnType.extract(buffer) + } +} + +private[sql] class GenericColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[DataType, Array[Byte]](buffer) + with NullableColumnAccessor { + + def columnType = GENERIC + + override protected def doExtractTo(row: MutableRow, ordinal: Int) { + val serialized = columnType.extract(buffer) + row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized) + } +} + +private[sql] object ColumnAccessor { + def apply(b: ByteBuffer): ColumnAccessor = { + // The first 4 bytes in the buffer indicates the column type. + val buffer = b.duplicate().order(ByteOrder.nativeOrder()) + val columnTypeId = buffer.getInt() + + columnTypeId match { + case INT.typeId => new IntColumnAccessor(buffer) + case LONG.typeId => new LongColumnAccessor(buffer) + case FLOAT.typeId => new FloatColumnAccessor(buffer) + case DOUBLE.typeId => new DoubleColumnAccessor(buffer) + case BOOLEAN.typeId => new BooleanColumnAccessor(buffer) + case BYTE.typeId => new ByteColumnAccessor(buffer) + case SHORT.typeId => new ShortColumnAccessor(buffer) + case STRING.typeId => new StringColumnAccessor(buffer) + case BINARY.typeId => new BinaryColumnAccessor(buffer) + case GENERIC.typeId => new GenericColumnAccessor(buffer) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala new file mode 100644 index 0000000000..6bd1841821 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.SparkSqlSerializer + +private[sql] trait ColumnBuilder { + /** + * Initializes with an approximate lower bound on the expected number of elements in this column. + */ + def initialize(initialSize: Int, columnName: String = "") + + def appendFrom(row: Row, ordinal: Int) + + def build(): ByteBuffer +} + +private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder { + import ColumnBuilder._ + + private var columnName: String = _ + protected var buffer: ByteBuffer = _ + + def columnType: ColumnType[T, JvmType] + + override def initialize(initialSize: Int, columnName: String = "") = { + val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize + this.columnName = columnName + buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize) + buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId) + } + + // Have to give a concrete implementation to make mixin possible + override def appendFrom(row: Row, ordinal: Int) { + doAppendFrom(row, ordinal) + } + + // Concrete `ColumnBuilder`s can override this method to append values + protected def doAppendFrom(row: Row, ordinal: Int) + + // Helper method to append primitive values (to avoid boxing cost) + protected def appendValue(v: JvmType) { + buffer = ensureFreeSpace(buffer, columnType.actualSize(v)) + columnType.append(v, buffer) + } + + override def build() = { + buffer.limit(buffer.position()).rewind() + buffer + } +} + +private[sql] abstract class NativeColumnBuilder[T <: NativeType]( + val columnType: NativeColumnType[T]) + extends BasicColumnBuilder[T, T#JvmType] + with NullableColumnBuilder + +private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getBoolean(ordinal)) + } +} + +private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getInt(ordinal)) + } +} + +private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getShort(ordinal)) + } +} + +private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getLong(ordinal)) + } +} + +private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getByte(ordinal)) + } +} + +private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getDouble(ordinal)) + } +} + +private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getFloat(ordinal)) + } +} + +private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) { + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row.getString(ordinal)) + } +} + +private[sql] class BinaryColumnBuilder + extends BasicColumnBuilder[BinaryType.type, Array[Byte]] + with NullableColumnBuilder { + + def columnType = BINARY + + override def doAppendFrom(row: Row, ordinal: Int) { + appendValue(row(ordinal).asInstanceOf[Array[Byte]]) + } +} + +// TODO (lian) Add support for array, struct and map +private[sql] class GenericColumnBuilder + extends BasicColumnBuilder[DataType, Array[Byte]] + with NullableColumnBuilder { + + def columnType = GENERIC + + override def doAppendFrom(row: Row, ordinal: Int) { + val serialized = SparkSqlSerializer.serialize(row(ordinal)) + buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized)) + columnType.append(serialized, buffer) + } +} + +private[sql] object ColumnBuilder { + val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104 + + private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { + if (orig.remaining >= size) { + orig + } else { + // grow in steps of initial size + val capacity = orig.capacity() + val newSize = capacity + size.max(capacity / 8 + 1) + val pos = orig.position() + + orig.clear() + ByteBuffer + .allocate(newSize) + .order(ByteOrder.nativeOrder()) + .put(orig.array(), 0, pos) + } + } + + def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = { + val builder = (typeId match { + case INT.typeId => new IntColumnBuilder + case LONG.typeId => new LongColumnBuilder + case FLOAT.typeId => new FloatColumnBuilder + case DOUBLE.typeId => new DoubleColumnBuilder + case BOOLEAN.typeId => new BooleanColumnBuilder + case BYTE.typeId => new ByteColumnBuilder + case SHORT.typeId => new ShortColumnBuilder + case STRING.typeId => new StringColumnBuilder + case BINARY.typeId => new BinaryColumnBuilder + case GENERIC.typeId => new GenericColumnBuilder + }).asInstanceOf[ColumnBuilder] + + builder.initialize(initialSize, columnName) + builder + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala new file mode 100644 index 0000000000..3b759a51cc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql +package columnar + +import java.nio.ByteBuffer + +import org.apache.spark.sql.catalyst.types._ + +/** + * An abstract class that represents type of a column. Used to append/extract Java objects into/from + * the underlying [[ByteBuffer]] of a column. + * + * @param typeId A unique ID representing the type. + * @param defaultSize Default size in bytes for one element of type T (e.g. 4 for `Int`). + * @tparam T Scala data type for the column. + * @tparam JvmType Underlying Java type to represent the elements. + */ +private[sql] sealed abstract class ColumnType[T <: DataType, JvmType]( + val typeId: Int, + val defaultSize: Int) { + + /** + * Extracts a value out of the buffer at the buffer's current position. + */ + def extract(buffer: ByteBuffer): JvmType + + /** + * Appends the given value v of type T into the given ByteBuffer. + */ + def append(v: JvmType, buffer: ByteBuffer) + + /** + * Returns the size of the value. This is used to calculate the size of variable length types + * such as byte arrays and strings. + */ + def actualSize(v: JvmType): Int = defaultSize + + /** + * Creates a duplicated copy of the value. + */ + def clone(v: JvmType): JvmType = v +} + +private[sql] abstract class NativeColumnType[T <: NativeType]( + val dataType: T, + typeId: Int, + defaultSize: Int) + extends ColumnType[T, T#JvmType](typeId, defaultSize) { + + /** + * Scala TypeTag. Can be used to create primitive arrays and hash tables. + */ + def scalaTag = dataType.tag +} + +private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) { + def append(v: Int, buffer: ByteBuffer) { + buffer.putInt(v) + } + + def extract(buffer: ByteBuffer) = { + buffer.getInt() + } +} + +private[sql] object LONG extends NativeColumnType(LongType, 1, 8) { + override def append(v: Long, buffer: ByteBuffer) { + buffer.putLong(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getLong() + } +} + +private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) { + override def append(v: Float, buffer: ByteBuffer) { + buffer.putFloat(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getFloat() + } +} + +private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) { + override def append(v: Double, buffer: ByteBuffer) { + buffer.putDouble(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getDouble() + } +} + +private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) { + override def append(v: Boolean, buffer: ByteBuffer) { + buffer.put(if (v) 1.toByte else 0.toByte) + } + + override def extract(buffer: ByteBuffer) = { + if (buffer.get() == 1) true else false + } +} + +private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) { + override def append(v: Byte, buffer: ByteBuffer) { + buffer.put(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.get() + } +} + +private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) { + override def append(v: Short, buffer: ByteBuffer) { + buffer.putShort(v) + } + + override def extract(buffer: ByteBuffer) = { + buffer.getShort() + } +} + +private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { + override def actualSize(v: String): Int = v.getBytes.length + 4 + + override def append(v: String, buffer: ByteBuffer) { + val stringBytes = v.getBytes() + buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length) + } + + override def extract(buffer: ByteBuffer) = { + val length = buffer.getInt() + val stringBytes = new Array[Byte](length) + buffer.get(stringBytes, 0, length) + new String(stringBytes) + } +} + +private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( + typeId: Int, + defaultSize: Int) + extends ColumnType[T, Array[Byte]](typeId, defaultSize) { + + override def actualSize(v: Array[Byte]) = v.length + 4 + + override def append(v: Array[Byte], buffer: ByteBuffer) { + buffer.putInt(v.length).put(v, 0, v.length) + } + + override def extract(buffer: ByteBuffer) = { + val length = buffer.getInt() + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + bytes + } +} + +private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) + +// Used to process generic objects (all types other than those listed above). Objects should be +// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized +// byte array. +private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) + +private[sql] object ColumnType { + implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = { + dataType match { + case IntegerType => INT + case LongType => LONG + case FloatType => FLOAT + case DoubleType => DOUBLE + case BooleanType => BOOLEAN + case ByteType => BYTE + case ShortType => SHORT + case StringType => STRING + case BinaryType => BINARY + case _ => GENERIC + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala new file mode 100644 index 0000000000..2970c609b9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.expressions.MutableRow + +private[sql] trait NullableColumnAccessor extends ColumnAccessor { + private var nullsBuffer: ByteBuffer = _ + private var nullCount: Int = _ + private var seenNulls: Int = 0 + + private var nextNullIndex: Int = _ + private var pos: Int = 0 + + abstract override def initialize() { + nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) + nullCount = nullsBuffer.getInt() + nextNullIndex = if (nullCount > 0) nullsBuffer.getInt() else -1 + pos = 0 + + underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) + super.initialize() + } + + abstract override def extractTo(row: MutableRow, ordinal: Int) { + if (pos == nextNullIndex) { + seenNulls += 1 + + if (seenNulls < nullCount) { + nextNullIndex = nullsBuffer.getInt() + } + + row.setNullAt(ordinal) + } else { + super.extractTo(row, ordinal) + } + + pos += 1 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala new file mode 100644 index 0000000000..1661c3f3ff --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import java.nio.{ByteOrder, ByteBuffer} + +/** + * Builds a nullable column. The byte buffer of a nullable column contains: + * - 4 bytes for the null count (number of nulls) + * - positions for each null, in ascending order + * - the non-null data (column data type, compression type, data...) + */ +private[sql] trait NullableColumnBuilder extends ColumnBuilder { + private var nulls: ByteBuffer = _ + private var pos: Int = _ + private var nullCount: Int = _ + + abstract override def initialize(initialSize: Int, columnName: String) { + nulls = ByteBuffer.allocate(1024) + nulls.order(ByteOrder.nativeOrder()) + pos = 0 + nullCount = 0 + super.initialize(initialSize, columnName) + } + + abstract override def appendFrom(row: Row, ordinal: Int) { + if (row.isNullAt(ordinal)) { + nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) + nulls.putInt(pos) + nullCount += 1 + } else { + super.appendFrom(row, ordinal) + } + pos += 1 + } + + abstract override def build(): ByteBuffer = { + val nonNulls = super.build() + val typeId = nonNulls.getInt() + val nullDataLen = nulls.position() + + nulls.limit(nullDataLen) + nulls.rewind() + + // Column type ID is moved to the front, follows the null count, then non-null data + // + // +---------+ + // | 4 bytes | Column type ID + // +---------+ + // | 4 bytes | Null count + // +---------+ + // | ... | Null positions (if null count is not zero) + // +---------+ + // | ... | Non-null part (without column type ID) + // +---------+ + val buffer = ByteBuffer + .allocate(4 + nullDataLen + nonNulls.limit) + .order(ByteOrder.nativeOrder()) + .putInt(typeId) + .putInt(nullCount) + .put(nulls) + .put(nonNulls) + + buffer.rewind() + buffer + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala new file mode 100644 index 0000000000..c7efd30e87 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute} +import org.apache.spark.sql.execution.{SparkPlan, LeafNode} + +private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan) + extends LeafNode { + + // For implicit conversion from `DataType` to `ColumnType` + import ColumnType._ + + override def output: Seq[Attribute] = attributes + + lazy val cachedColumnBuffers = { + val output = child.output + val cached = child.execute().mapPartitions { iterator => + val columnBuilders = output.map { a => + ColumnBuilder(a.dataType.typeId, 0, a.name) + }.toArray + + var row: Row = null + while (iterator.hasNext) { + row = iterator.next() + var i = 0 + while (i < row.length) { + columnBuilders(i).appendFrom(row, i) + i += 1 + } + } + + Iterator.single(columnBuilders.map(_.build())) + }.cache() + + cached.setName(child.toString) + // Force the materialization of the cached RDD. + cached.count() + cached + } + + override def execute() = { + cachedColumnBuffers.mapPartitions { iterator => + val columnBuffers = iterator.next() + assert(!iterator.hasNext) + + new Iterator[Row] { + val columnAccessors = columnBuffers.map(ColumnAccessor(_)) + val nextRow = new GenericMutableRow(columnAccessors.length) + + override def next() = { + var i = 0 + while (i < nextRow.length) { + columnAccessors(i).extractTo(nextRow, i) + i += 1 + } + nextRow + } + + override def hasNext = columnAccessors.head.hasNext + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 72dc5ec6ad..e934c4cf69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -18,14 +18,8 @@ package org.apache.spark.sql package execution -import java.nio.ByteBuffer - -import com.esotericsoftware.kryo.{Kryo, Serializer} -import com.esotericsoftware.kryo.io.{Output, Input} - import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner} import org.apache.spark.rdd.ShuffledRDD -import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.MutablePair import catalyst.rules.Rule @@ -33,33 +27,6 @@ import catalyst.errors._ import catalyst.expressions._ import catalyst.plans.physical._ -private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { - override def newKryo(): Kryo = { - val kryo = new Kryo - kryo.setRegistrationRequired(true) - kryo.register(classOf[MutablePair[_,_]]) - kryo.register(classOf[Array[Any]]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) - kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) - kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) - kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) - kryo.setReferences(false) - kryo.setClassLoader(this.getClass.getClassLoader) - kryo - } -} - -private class BigDecimalSerializer extends Serializer[BigDecimal] { - def write(kryo: Kryo, output: Output, bd: math.BigDecimal) { - // TODO: There are probably more efficient representations than strings... - output.writeString(bd.toString) - } - - def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { - BigDecimal(input.readString()) - } -} - case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode { override def outputPartitioning = newPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala new file mode 100644 index 0000000000..ad7cd58b6a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package execution + +import java.nio.ByteBuffer + +import com.esotericsoftware.kryo.io.{Input, Output} +import com.esotericsoftware.kryo.{Serializer, Kryo} + +import org.apache.spark.{SparkEnv, SparkConf} +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.MutablePair + +class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) { + override def newKryo(): Kryo = { + val kryo = new Kryo() + kryo.setRegistrationRequired(false) + kryo.register(classOf[MutablePair[_, _]]) + kryo.register(classOf[Array[Any]]) + kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow]) + kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow]) + kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]]) + kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer) + kryo.setReferences(false) + kryo.setClassLoader(this.getClass.getClassLoader) + kryo + } +} + +object SparkSqlSerializer { + // TODO (lian) Using KryoSerializer here is workaround, needs further investigation + // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization + // related error. + @transient lazy val ser: KryoSerializer = { + val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + new KryoSerializer(sparkConf) + } + + def serialize[T](o: T): Array[Byte] = { + ser.newInstance().serialize(o).array() + } + + def deserialize[T](bytes: Array[Byte]): T = { + ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) + } +} + +class BigDecimalSerializer extends Serializer[BigDecimal] { + def write(kryo: Kryo, output: Output, bd: math.BigDecimal) { + // TODO: There are probably more efficient representations than strings... + output.writeString(bd.toString()) + } + + def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = { + BigDecimal(input.readString()) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 728feceded..aa84211648 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -33,7 +33,7 @@ import TestSQLContext._ class QueryTest extends FunSuite { /** * Runs the plan and makes sure the answer matches the expected result. - * @param plan the query to be executed + * @param rdd the [[SchemaRDD]] to be executed * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ]. */ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala new file mode 100644 index 0000000000..c7aaaae94e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -0,0 +1,204 @@ +package org.apache.spark.sql +package columnar + +import java.nio.ByteBuffer + +import scala.util.Random + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.SparkSqlSerializer + +class ColumnTypeSuite extends FunSuite { + val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC) + + test("defaultSize") { + val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16) + + columnTypes.zip(defaultSize).foreach { case (columnType, size) => + assert(columnType.defaultSize === size) + } + } + + test("actualSize") { + val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11) + val actualSizes = Seq( + INT.actualSize(Int.MaxValue), + SHORT.actualSize(Short.MaxValue), + LONG.actualSize(Long.MaxValue), + BYTE.actualSize(Byte.MaxValue), + DOUBLE.actualSize(Double.MaxValue), + FLOAT.actualSize(Float.MaxValue), + STRING.actualSize("hello"), + BINARY.actualSize(new Array[Byte](4)), + GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a")))) + + expectedSizes.zip(actualSizes).foreach { case (expected, actual) => + assert(expected === actual) + } + } + + testNumericColumnType[BooleanType.type, Boolean]( + BOOLEAN, + Array.fill(4)(Random.nextBoolean()), + ByteBuffer.allocate(32), + (buffer: ByteBuffer, v: Boolean) => { + buffer.put((if (v) 1 else 0).toByte) + }, + (buffer: ByteBuffer) => { + buffer.get() == 1 + }) + + testNumericColumnType[IntegerType.type, Int]( + INT, + Array.fill(4)(Random.nextInt()), + ByteBuffer.allocate(32), + (_: ByteBuffer).putInt(_), + (_: ByteBuffer).getInt) + + testNumericColumnType[ShortType.type, Short]( + SHORT, + Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]), + ByteBuffer.allocate(32), + (_: ByteBuffer).putShort(_), + (_: ByteBuffer).getShort) + + testNumericColumnType[LongType.type, Long]( + LONG, + Array.fill(4)(Random.nextLong()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putLong(_), + (_: ByteBuffer).getLong) + + testNumericColumnType[ByteType.type, Byte]( + BYTE, + Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]), + ByteBuffer.allocate(64), + (_: ByteBuffer).put(_), + (_: ByteBuffer).get) + + testNumericColumnType[DoubleType.type, Double]( + DOUBLE, + Array.fill(4)(Random.nextDouble()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putDouble(_), + (_: ByteBuffer).getDouble) + + testNumericColumnType[FloatType.type, Float]( + FLOAT, + Array.fill(4)(Random.nextFloat()), + ByteBuffer.allocate(64), + (_: ByteBuffer).putFloat(_), + (_: ByteBuffer).getFloat) + + test("STRING") { + val buffer = ByteBuffer.allocate(128) + val seq = Array("hello", "world", "spark", "sql") + + seq.map(_.getBytes).foreach { bytes: Array[Byte] => + buffer.putInt(bytes.length).put(bytes) + } + + buffer.rewind() + seq.foreach { s => + assert(s === STRING.extract(buffer)) + } + + buffer.rewind() + seq.foreach(STRING.append(_, buffer)) + + buffer.rewind() + seq.foreach { s => + val length = buffer.getInt + assert(length === s.getBytes.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(s === new String(bytes)) + } + } + + test("BINARY") { + val buffer = ByteBuffer.allocate(128) + val seq = Array.fill(4) { + val bytes = new Array[Byte](4) + Random.nextBytes(bytes) + bytes + } + + seq.foreach { bytes => + buffer.putInt(bytes.length).put(bytes) + } + + buffer.rewind() + seq.foreach { b => + assert(b === BINARY.extract(buffer)) + } + + buffer.rewind() + seq.foreach(BINARY.append(_, buffer)) + + buffer.rewind() + seq.foreach { b => + val length = buffer.getInt + assert(length === b.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(b === bytes) + } + } + + test("GENERIC") { + val buffer = ByteBuffer.allocate(512) + val obj = Map(1 -> "spark", 2 -> "sql") + val serializedObj = SparkSqlSerializer.serialize(obj) + + GENERIC.append(SparkSqlSerializer.serialize(obj), buffer) + buffer.rewind() + + val length = buffer.getInt() + assert(length === serializedObj.length) + + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + assert(obj === SparkSqlSerializer.deserialize(bytes)) + + buffer.rewind() + buffer.putInt(serializedObj.length).put(serializedObj) + + buffer.rewind() + assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer))) + } + + def testNumericColumnType[T <: DataType, JvmType]( + columnType: ColumnType[T, JvmType], + seq: Seq[JvmType], + buffer: ByteBuffer, + putter: (ByteBuffer, JvmType) => Unit, + getter: (ByteBuffer) => JvmType) { + + val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$columnTypeName.extract") { + buffer.rewind() + seq.foreach(putter(buffer, _)) + + buffer.rewind() + seq.foreach { i => + assert(i === columnType.extract(buffer)) + } + } + + test(s"$columnTypeName.append") { + buffer.rewind() + seq.foreach(columnType.append(_, buffer)) + + buffer.rewind() + seq.foreach { i => + assert(i === getter(buffer)) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala new file mode 100644 index 0000000000..928851a385 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar + +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{TestData, DslQuerySuite} + +class ColumnarQuerySuite extends DslQuerySuite { + import TestData._ + import TestSQLContext._ + + test("simple columnar query") { + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().toSeq) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala new file mode 100644 index 0000000000..ddcdede8d1 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar + +import scala.util.Random + +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow + +// TODO Enrich test data +object ColumnarTestData { + object GenericMutableRow { + def apply(values: Any*) = { + val row = new GenericMutableRow(values.length) + row.indices.foreach { i => + row(i) = values(i) + } + row + } + } + + def randomBytes(length: Int) = { + val bytes = new Array[Byte](length) + Random.nextBytes(bytes) + bytes + } + + val nonNullRandomRow = GenericMutableRow( + Random.nextInt(), + Random.nextLong(), + Random.nextFloat(), + Random.nextDouble(), + Random.nextBoolean(), + Random.nextInt(Byte.MaxValue).asInstanceOf[Byte], + Random.nextInt(Short.MaxValue).asInstanceOf[Short], + Random.nextString(Random.nextInt(64)), + randomBytes(Random.nextInt(64)), + Map(Random.nextInt() -> Random.nextString(4))) + + val nullRow = GenericMutableRow(Seq.fill(10)(null): _*) +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala new file mode 100644 index 0000000000..279607ccfa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import org.scalatest.FunSuite +import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow + +class NullableColumnAccessorSuite extends FunSuite { + import ColumnarTestData._ + + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + testNullableColumnAccessor(_) + } + + def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$typeName accessor: empty column") { + val builder = ColumnBuilder(columnType.typeId, 4) + val accessor = ColumnAccessor(builder.build()) + assert(!accessor.hasNext) + } + + test(s"$typeName accessor: access null values") { + val builder = ColumnBuilder(columnType.typeId, 4) + + (0 until 4).foreach { _ => + builder.appendFrom(nonNullRandomRow, columnType.typeId) + builder.appendFrom(nullRow, columnType.typeId) + } + + val accessor = ColumnAccessor(builder.build()) + val row = new GenericMutableRow(1) + + (0 until 4).foreach { _ => + accessor.extractTo(row, 0) + assert(row(0) === nonNullRandomRow(columnType.typeId)) + + accessor.extractTo(row, 0) + assert(row(0) === null) + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala new file mode 100644 index 0000000000..3354da3fa3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql +package columnar + +import org.scalatest.FunSuite + +import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.execution.SparkSqlSerializer + +class NullableColumnBuilderSuite extends FunSuite { + import ColumnarTestData._ + + Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach { + testNullableColumnBuilder(_) + } + + def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) { + val columnBuilder = ColumnBuilder(columnType.typeId) + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + test(s"$typeName column builder: empty column") { + columnBuilder.initialize(4) + + val buffer = columnBuilder.build() + + // For column type ID + assert(buffer.getInt() === columnType.typeId) + // For null count + assert(buffer.getInt === 0) + assert(!buffer.hasRemaining) + } + + test(s"$typeName column builder: buffer size auto growth") { + columnBuilder.initialize(4) + + (0 until 4) foreach { _ => + columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId) + } + + val buffer = columnBuilder.build() + + // For column type ID + assert(buffer.getInt() === columnType.typeId) + // For null count + assert(buffer.getInt() === 0) + } + + test(s"$typeName column builder: null values") { + columnBuilder.initialize(4) + + (0 until 4) foreach { _ => + columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId) + columnBuilder.appendFrom(nullRow, columnType.typeId) + } + + val buffer = columnBuilder.build() + + // For column type ID + assert(buffer.getInt() === columnType.typeId) + // For null count + assert(buffer.getInt() === 4) + // For null positions + (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i)) + + // For non-null values + (0 until 4).foreach { _ => + val actual = if (columnType == GENERIC) { + SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer)) + } else { + columnType.extract(buffer) + } + assert(actual === nonNullRandomRow(columnType.typeId)) + } + + assert(!buffer.hasRemaining) + } + } +} |