aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-03-23 12:08:55 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-23 12:08:55 -0700
commit57a4379c031e5d5901ba580422207d6aa2f19749 (patch)
treebad7341671b0b9adeacc114432befe7f749d8f9d
parentabf6714e27cf07a13819b35a4ca50ff9bb28b65c (diff)
downloadspark-57a4379c031e5d5901ba580422207d6aa2f19749.tar.gz
spark-57a4379c031e5d5901ba580422207d6aa2f19749.tar.bz2
spark-57a4379c031e5d5901ba580422207d6aa2f19749.zip
[SPARK-1292] In-memory columnar representation for Spark SQL
This PR is rebased from the Catalyst repository, and contains the first version of in-memory columnar representation for Spark SQL. Compression support is not included yet and will be added later in a separate PR. Author: Cheng Lian <lian@databricks.com> Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #205 from liancheng/memColumnarSupport and squashes the following commits: 99dba41 [Cheng Lian] Restricted new objects/classes to `private[sql]' 0892ad8 [Cheng Lian] Addressed ScalaStyle issues af1ad5e [Cheng Lian] Fixed some minor issues introduced during rebasing 0dbf2fb [Cheng Lian] Make necessary renaming due to rebase a162d4d [Cheng Lian] Removed the unnecessary InMemoryColumnarRelation class 9bcae4b [Cheng Lian] Added Apache license 220ee1e [Cheng Lian] Added table scan operator for in-memory columnar support. c701c7a [Cheng Lian] Using SparkSqlSerializer for generic object SerDe causes error, made a workaround ed8608e [Cheng Lian] Added implicit conversion from DataType to ColumnType b8a645a [Cheng Lian] Replaced KryoSerializer with an updated SparkSqlSerializer b6c0a49 [Cheng Lian] Minor test suite refactoring 214be73 [Cheng Lian] Refactored BINARY and GENERIC to reduce duplicate code da2f4d5 [Cheng Lian] Added Apache license dbf7a38 [Cheng Lian] Added ColumnAccessor and test suite, refactored ColumnBuilder c01a177 [Cheng Lian] Added column builder classes and test suite f18ddc6 [Cheng Lian] Added ColumnTypes and test suite 2d09066 [Cheng Lian] Added KryoSerializer 34f3c19 [Cheng Lian] Added TypeTag field to all NativeTypes acc5c48 [Cheng Lian] Added Hive test files to .gitignore
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala175
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala187
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala198
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala57
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala83
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala80
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala33
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala73
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala204
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala55
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala61
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala94
15 files changed, 1315 insertions, 35 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 6eb2b62ecc..90a9f9f7e5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql
package catalyst
package types
-import expressions.Expression
+import scala.reflect.runtime.universe.{typeTag, TypeTag}
+
+import org.apache.spark.sql.catalyst.expressions.Expression
abstract class DataType {
/** Matches any expression that evaluates to this DataType */
@@ -33,11 +35,13 @@ case object NullType extends DataType
abstract class NativeType extends DataType {
type JvmType
+ @transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]
}
case object StringType extends NativeType {
type JvmType = String
+ @transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
case object BinaryType extends DataType {
@@ -45,6 +49,7 @@ case object BinaryType extends DataType {
}
case object BooleanType extends NativeType {
type JvmType = Boolean
+ @transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
@@ -71,6 +76,7 @@ abstract class IntegralType extends NumericType {
case object LongType extends IntegralType {
type JvmType = Long
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Long]]
val integral = implicitly[Integral[Long]]
val ordering = implicitly[Ordering[JvmType]]
@@ -78,6 +84,7 @@ case object LongType extends IntegralType {
case object IntegerType extends IntegralType {
type JvmType = Int
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Int]]
val integral = implicitly[Integral[Int]]
val ordering = implicitly[Ordering[JvmType]]
@@ -85,6 +92,7 @@ case object IntegerType extends IntegralType {
case object ShortType extends IntegralType {
type JvmType = Short
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Short]]
val integral = implicitly[Integral[Short]]
val ordering = implicitly[Ordering[JvmType]]
@@ -92,6 +100,7 @@ case object ShortType extends IntegralType {
case object ByteType extends IntegralType {
type JvmType = Byte
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Byte]]
val integral = implicitly[Integral[Byte]]
val ordering = implicitly[Ordering[JvmType]]
@@ -110,6 +119,7 @@ abstract class FractionalType extends NumericType {
case object DecimalType extends FractionalType {
type JvmType = BigDecimal
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[BigDecimal]]
val fractional = implicitly[Fractional[BigDecimal]]
val ordering = implicitly[Ordering[JvmType]]
@@ -117,6 +127,7 @@ case object DecimalType extends FractionalType {
case object DoubleType extends FractionalType {
type JvmType = Double
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Double]]
val fractional = implicitly[Fractional[Double]]
val ordering = implicitly[Ordering[JvmType]]
@@ -124,6 +135,7 @@ case object DoubleType extends FractionalType {
case object FloatType extends FractionalType {
type JvmType = Float
+ @transient lazy val tag = typeTag[JvmType]
val numeric = implicitly[Numeric[Float]]
val fractional = implicitly[Fractional[Float]]
val ordering = implicitly[Ordering[JvmType]]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
new file mode 100644
index 0000000000..ddbeba6203
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+/**
+ * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
+ * extracted from the buffer, instead of directly returning it, the value is set into some field of
+ * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[MutableRow]].
+ */
+private[sql] trait ColumnAccessor {
+ initialize()
+
+ protected def initialize()
+
+ def hasNext: Boolean
+
+ def extractTo(row: MutableRow, ordinal: Int)
+
+ protected def underlyingBuffer: ByteBuffer
+}
+
+private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
+ extends ColumnAccessor {
+
+ protected def initialize() {}
+
+ def columnType: ColumnType[T, JvmType]
+
+ def hasNext = buffer.hasRemaining
+
+ def extractTo(row: MutableRow, ordinal: Int) {
+ doExtractTo(row, ordinal)
+ }
+
+ protected def doExtractTo(row: MutableRow, ordinal: Int)
+
+ protected def underlyingBuffer = buffer
+}
+
+private[sql] abstract class NativeColumnAccessor[T <: NativeType](
+ buffer: ByteBuffer,
+ val columnType: NativeColumnType[T])
+ extends BasicColumnAccessor[T, T#JvmType](buffer)
+ with NullableColumnAccessor
+
+private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, BOOLEAN) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setBoolean(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class IntColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, INT) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setInt(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, SHORT) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setShort(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class LongColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, LONG) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setLong(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, BYTE) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setByte(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, DOUBLE) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setDouble(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, FLOAT) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setFloat(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class StringColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, STRING) {
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row.setString(ordinal, columnType.extract(buffer))
+ }
+}
+
+private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
+ extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
+ with NullableColumnAccessor {
+
+ def columnType = BINARY
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ row(ordinal) = columnType.extract(buffer)
+ }
+}
+
+private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
+ extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
+ with NullableColumnAccessor {
+
+ def columnType = GENERIC
+
+ override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+ val serialized = columnType.extract(buffer)
+ row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
+ }
+}
+
+private[sql] object ColumnAccessor {
+ def apply(b: ByteBuffer): ColumnAccessor = {
+ // The first 4 bytes in the buffer indicates the column type.
+ val buffer = b.duplicate().order(ByteOrder.nativeOrder())
+ val columnTypeId = buffer.getInt()
+
+ columnTypeId match {
+ case INT.typeId => new IntColumnAccessor(buffer)
+ case LONG.typeId => new LongColumnAccessor(buffer)
+ case FLOAT.typeId => new FloatColumnAccessor(buffer)
+ case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
+ case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
+ case BYTE.typeId => new ByteColumnAccessor(buffer)
+ case SHORT.typeId => new ShortColumnAccessor(buffer)
+ case STRING.typeId => new StringColumnAccessor(buffer)
+ case BINARY.typeId => new BinaryColumnAccessor(buffer)
+ case GENERIC.typeId => new GenericColumnAccessor(buffer)
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
new file mode 100644
index 0000000000..6bd1841821
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+private[sql] trait ColumnBuilder {
+ /**
+ * Initializes with an approximate lower bound on the expected number of elements in this column.
+ */
+ def initialize(initialSize: Int, columnName: String = "")
+
+ def appendFrom(row: Row, ordinal: Int)
+
+ def build(): ByteBuffer
+}
+
+private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
+ import ColumnBuilder._
+
+ private var columnName: String = _
+ protected var buffer: ByteBuffer = _
+
+ def columnType: ColumnType[T, JvmType]
+
+ override def initialize(initialSize: Int, columnName: String = "") = {
+ val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
+ this.columnName = columnName
+ buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)
+ buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
+ }
+
+ // Have to give a concrete implementation to make mixin possible
+ override def appendFrom(row: Row, ordinal: Int) {
+ doAppendFrom(row, ordinal)
+ }
+
+ // Concrete `ColumnBuilder`s can override this method to append values
+ protected def doAppendFrom(row: Row, ordinal: Int)
+
+ // Helper method to append primitive values (to avoid boxing cost)
+ protected def appendValue(v: JvmType) {
+ buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
+ columnType.append(v, buffer)
+ }
+
+ override def build() = {
+ buffer.limit(buffer.position()).rewind()
+ buffer
+ }
+}
+
+private[sql] abstract class NativeColumnBuilder[T <: NativeType](
+ val columnType: NativeColumnType[T])
+ extends BasicColumnBuilder[T, T#JvmType]
+ with NullableColumnBuilder
+
+private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getBoolean(ordinal))
+ }
+}
+
+private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getInt(ordinal))
+ }
+}
+
+private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getShort(ordinal))
+ }
+}
+
+private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getLong(ordinal))
+ }
+}
+
+private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getByte(ordinal))
+ }
+}
+
+private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getDouble(ordinal))
+ }
+}
+
+private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getFloat(ordinal))
+ }
+}
+
+private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row.getString(ordinal))
+ }
+}
+
+private[sql] class BinaryColumnBuilder
+ extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
+ with NullableColumnBuilder {
+
+ def columnType = BINARY
+
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ appendValue(row(ordinal).asInstanceOf[Array[Byte]])
+ }
+}
+
+// TODO (lian) Add support for array, struct and map
+private[sql] class GenericColumnBuilder
+ extends BasicColumnBuilder[DataType, Array[Byte]]
+ with NullableColumnBuilder {
+
+ def columnType = GENERIC
+
+ override def doAppendFrom(row: Row, ordinal: Int) {
+ val serialized = SparkSqlSerializer.serialize(row(ordinal))
+ buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
+ columnType.append(serialized, buffer)
+ }
+}
+
+private[sql] object ColumnBuilder {
+ val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
+
+ private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
+ if (orig.remaining >= size) {
+ orig
+ } else {
+ // grow in steps of initial size
+ val capacity = orig.capacity()
+ val newSize = capacity + size.max(capacity / 8 + 1)
+ val pos = orig.position()
+
+ orig.clear()
+ ByteBuffer
+ .allocate(newSize)
+ .order(ByteOrder.nativeOrder())
+ .put(orig.array(), 0, pos)
+ }
+ }
+
+ def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
+ val builder = (typeId match {
+ case INT.typeId => new IntColumnBuilder
+ case LONG.typeId => new LongColumnBuilder
+ case FLOAT.typeId => new FloatColumnBuilder
+ case DOUBLE.typeId => new DoubleColumnBuilder
+ case BOOLEAN.typeId => new BooleanColumnBuilder
+ case BYTE.typeId => new ByteColumnBuilder
+ case SHORT.typeId => new ShortColumnBuilder
+ case STRING.typeId => new StringColumnBuilder
+ case BINARY.typeId => new BinaryColumnBuilder
+ case GENERIC.typeId => new GenericColumnBuilder
+ }).asInstanceOf[ColumnBuilder]
+
+ builder.initialize(initialSize, columnName)
+ builder
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
new file mode 100644
index 0000000000..3b759a51cc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+package columnar
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * An abstract class that represents type of a column. Used to append/extract Java objects into/from
+ * the underlying [[ByteBuffer]] of a column.
+ *
+ * @param typeId A unique ID representing the type.
+ * @param defaultSize Default size in bytes for one element of type T (e.g. 4 for `Int`).
+ * @tparam T Scala data type for the column.
+ * @tparam JvmType Underlying Java type to represent the elements.
+ */
+private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
+ val typeId: Int,
+ val defaultSize: Int) {
+
+ /**
+ * Extracts a value out of the buffer at the buffer's current position.
+ */
+ def extract(buffer: ByteBuffer): JvmType
+
+ /**
+ * Appends the given value v of type T into the given ByteBuffer.
+ */
+ def append(v: JvmType, buffer: ByteBuffer)
+
+ /**
+ * Returns the size of the value. This is used to calculate the size of variable length types
+ * such as byte arrays and strings.
+ */
+ def actualSize(v: JvmType): Int = defaultSize
+
+ /**
+ * Creates a duplicated copy of the value.
+ */
+ def clone(v: JvmType): JvmType = v
+}
+
+private[sql] abstract class NativeColumnType[T <: NativeType](
+ val dataType: T,
+ typeId: Int,
+ defaultSize: Int)
+ extends ColumnType[T, T#JvmType](typeId, defaultSize) {
+
+ /**
+ * Scala TypeTag. Can be used to create primitive arrays and hash tables.
+ */
+ def scalaTag = dataType.tag
+}
+
+private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
+ def append(v: Int, buffer: ByteBuffer) {
+ buffer.putInt(v)
+ }
+
+ def extract(buffer: ByteBuffer) = {
+ buffer.getInt()
+ }
+}
+
+private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
+ override def append(v: Long, buffer: ByteBuffer) {
+ buffer.putLong(v)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ buffer.getLong()
+ }
+}
+
+private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
+ override def append(v: Float, buffer: ByteBuffer) {
+ buffer.putFloat(v)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ buffer.getFloat()
+ }
+}
+
+private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
+ override def append(v: Double, buffer: ByteBuffer) {
+ buffer.putDouble(v)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ buffer.getDouble()
+ }
+}
+
+private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
+ override def append(v: Boolean, buffer: ByteBuffer) {
+ buffer.put(if (v) 1.toByte else 0.toByte)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ if (buffer.get() == 1) true else false
+ }
+}
+
+private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
+ override def append(v: Byte, buffer: ByteBuffer) {
+ buffer.put(v)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ buffer.get()
+ }
+}
+
+private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
+ override def append(v: Short, buffer: ByteBuffer) {
+ buffer.putShort(v)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ buffer.getShort()
+ }
+}
+
+private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
+ override def actualSize(v: String): Int = v.getBytes.length + 4
+
+ override def append(v: String, buffer: ByteBuffer) {
+ val stringBytes = v.getBytes()
+ buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ val length = buffer.getInt()
+ val stringBytes = new Array[Byte](length)
+ buffer.get(stringBytes, 0, length)
+ new String(stringBytes)
+ }
+}
+
+private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
+ typeId: Int,
+ defaultSize: Int)
+ extends ColumnType[T, Array[Byte]](typeId, defaultSize) {
+
+ override def actualSize(v: Array[Byte]) = v.length + 4
+
+ override def append(v: Array[Byte], buffer: ByteBuffer) {
+ buffer.putInt(v.length).put(v, 0, v.length)
+ }
+
+ override def extract(buffer: ByteBuffer) = {
+ val length = buffer.getInt()
+ val bytes = new Array[Byte](length)
+ buffer.get(bytes, 0, length)
+ bytes
+ }
+}
+
+private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16)
+
+// Used to process generic objects (all types other than those listed above). Objects should be
+// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
+// byte array.
+private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16)
+
+private[sql] object ColumnType {
+ implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = {
+ dataType match {
+ case IntegerType => INT
+ case LongType => LONG
+ case FloatType => FLOAT
+ case DoubleType => DOUBLE
+ case BooleanType => BOOLEAN
+ case ByteType => BYTE
+ case ShortType => SHORT
+ case StringType => STRING
+ case BinaryType => BINARY
+ case _ => GENERIC
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
new file mode 100644
index 0000000000..2970c609b9
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+
+private[sql] trait NullableColumnAccessor extends ColumnAccessor {
+ private var nullsBuffer: ByteBuffer = _
+ private var nullCount: Int = _
+ private var seenNulls: Int = 0
+
+ private var nextNullIndex: Int = _
+ private var pos: Int = 0
+
+ abstract override def initialize() {
+ nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder())
+ nullCount = nullsBuffer.getInt()
+ nextNullIndex = if (nullCount > 0) nullsBuffer.getInt() else -1
+ pos = 0
+
+ underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
+ super.initialize()
+ }
+
+ abstract override def extractTo(row: MutableRow, ordinal: Int) {
+ if (pos == nextNullIndex) {
+ seenNulls += 1
+
+ if (seenNulls < nullCount) {
+ nextNullIndex = nullsBuffer.getInt()
+ }
+
+ row.setNullAt(ordinal)
+ } else {
+ super.extractTo(row, ordinal)
+ }
+
+ pos += 1
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
new file mode 100644
index 0000000000..1661c3f3ff
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+/**
+ * Builds a nullable column. The byte buffer of a nullable column contains:
+ * - 4 bytes for the null count (number of nulls)
+ * - positions for each null, in ascending order
+ * - the non-null data (column data type, compression type, data...)
+ */
+private[sql] trait NullableColumnBuilder extends ColumnBuilder {
+ private var nulls: ByteBuffer = _
+ private var pos: Int = _
+ private var nullCount: Int = _
+
+ abstract override def initialize(initialSize: Int, columnName: String) {
+ nulls = ByteBuffer.allocate(1024)
+ nulls.order(ByteOrder.nativeOrder())
+ pos = 0
+ nullCount = 0
+ super.initialize(initialSize, columnName)
+ }
+
+ abstract override def appendFrom(row: Row, ordinal: Int) {
+ if (row.isNullAt(ordinal)) {
+ nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
+ nulls.putInt(pos)
+ nullCount += 1
+ } else {
+ super.appendFrom(row, ordinal)
+ }
+ pos += 1
+ }
+
+ abstract override def build(): ByteBuffer = {
+ val nonNulls = super.build()
+ val typeId = nonNulls.getInt()
+ val nullDataLen = nulls.position()
+
+ nulls.limit(nullDataLen)
+ nulls.rewind()
+
+ // Column type ID is moved to the front, follows the null count, then non-null data
+ //
+ // +---------+
+ // | 4 bytes | Column type ID
+ // +---------+
+ // | 4 bytes | Null count
+ // +---------+
+ // | ... | Null positions (if null count is not zero)
+ // +---------+
+ // | ... | Non-null part (without column type ID)
+ // +---------+
+ val buffer = ByteBuffer
+ .allocate(4 + nullDataLen + nonNulls.limit)
+ .order(ByteOrder.nativeOrder())
+ .putInt(typeId)
+ .putInt(nullCount)
+ .put(nulls)
+ .put(nonNulls)
+
+ buffer.rewind()
+ buffer
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala
new file mode 100644
index 0000000000..c7efd30e87
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
+
+private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
+ extends LeafNode {
+
+ // For implicit conversion from `DataType` to `ColumnType`
+ import ColumnType._
+
+ override def output: Seq[Attribute] = attributes
+
+ lazy val cachedColumnBuffers = {
+ val output = child.output
+ val cached = child.execute().mapPartitions { iterator =>
+ val columnBuilders = output.map { a =>
+ ColumnBuilder(a.dataType.typeId, 0, a.name)
+ }.toArray
+
+ var row: Row = null
+ while (iterator.hasNext) {
+ row = iterator.next()
+ var i = 0
+ while (i < row.length) {
+ columnBuilders(i).appendFrom(row, i)
+ i += 1
+ }
+ }
+
+ Iterator.single(columnBuilders.map(_.build()))
+ }.cache()
+
+ cached.setName(child.toString)
+ // Force the materialization of the cached RDD.
+ cached.count()
+ cached
+ }
+
+ override def execute() = {
+ cachedColumnBuffers.mapPartitions { iterator =>
+ val columnBuffers = iterator.next()
+ assert(!iterator.hasNext)
+
+ new Iterator[Row] {
+ val columnAccessors = columnBuffers.map(ColumnAccessor(_))
+ val nextRow = new GenericMutableRow(columnAccessors.length)
+
+ override def next() = {
+ var i = 0
+ while (i < nextRow.length) {
+ columnAccessors(i).extractTo(nextRow, i)
+ i += 1
+ }
+ nextRow
+ }
+
+ override def hasNext = columnAccessors.head.hasNext
+ }
+ }
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 72dc5ec6ad..e934c4cf69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -18,14 +18,8 @@
package org.apache.spark.sql
package execution
-import java.nio.ByteBuffer
-
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Output, Input}
-
import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner}
import org.apache.spark.rdd.ShuffledRDD
-import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.MutablePair
import catalyst.rules.Rule
@@ -33,33 +27,6 @@ import catalyst.errors._
import catalyst.expressions._
import catalyst.plans.physical._
-private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
- override def newKryo(): Kryo = {
- val kryo = new Kryo
- kryo.setRegistrationRequired(true)
- kryo.register(classOf[MutablePair[_,_]])
- kryo.register(classOf[Array[Any]])
- kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
- kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
- kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
- kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
- kryo.setReferences(false)
- kryo.setClassLoader(this.getClass.getClassLoader)
- kryo
- }
-}
-
-private class BigDecimalSerializer extends Serializer[BigDecimal] {
- def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
- // TODO: There are probably more efficient representations than strings...
- output.writeString(bd.toString)
- }
-
- def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
- BigDecimal(input.readString())
- }
-}
-
case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
override def outputPartitioning = newPartitioning
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
new file mode 100644
index 0000000000..ad7cd58b6a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package execution
+
+import java.nio.ByteBuffer
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Serializer, Kryo}
+
+import org.apache.spark.{SparkEnv, SparkConf}
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.MutablePair
+
+class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
+ override def newKryo(): Kryo = {
+ val kryo = new Kryo()
+ kryo.setRegistrationRequired(false)
+ kryo.register(classOf[MutablePair[_, _]])
+ kryo.register(classOf[Array[Any]])
+ kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
+ kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
+ kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
+ kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
+ kryo.setReferences(false)
+ kryo.setClassLoader(this.getClass.getClassLoader)
+ kryo
+ }
+}
+
+object SparkSqlSerializer {
+ // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
+ // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
+ // related error.
+ @transient lazy val ser: KryoSerializer = {
+ val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
+ new KryoSerializer(sparkConf)
+ }
+
+ def serialize[T](o: T): Array[Byte] = {
+ ser.newInstance().serialize(o).array()
+ }
+
+ def deserialize[T](bytes: Array[Byte]): T = {
+ ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
+ }
+}
+
+class BigDecimalSerializer extends Serializer[BigDecimal] {
+ def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
+ // TODO: There are probably more efficient representations than strings...
+ output.writeString(bd.toString())
+ }
+
+ def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
+ BigDecimal(input.readString())
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 728feceded..aa84211648 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -33,7 +33,7 @@ import TestSQLContext._
class QueryTest extends FunSuite {
/**
* Runs the plan and makes sure the answer matches the expected result.
- * @param plan the query to be executed
+ * @param rdd the [[SchemaRDD]] to be executed
* @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
*/
protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
new file mode 100644
index 0000000000..c7aaaae94e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -0,0 +1,204 @@
+package org.apache.spark.sql
+package columnar
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+class ColumnTypeSuite extends FunSuite {
+ val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)
+
+ test("defaultSize") {
+ val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16)
+
+ columnTypes.zip(defaultSize).foreach { case (columnType, size) =>
+ assert(columnType.defaultSize === size)
+ }
+ }
+
+ test("actualSize") {
+ val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11)
+ val actualSizes = Seq(
+ INT.actualSize(Int.MaxValue),
+ SHORT.actualSize(Short.MaxValue),
+ LONG.actualSize(Long.MaxValue),
+ BYTE.actualSize(Byte.MaxValue),
+ DOUBLE.actualSize(Double.MaxValue),
+ FLOAT.actualSize(Float.MaxValue),
+ STRING.actualSize("hello"),
+ BINARY.actualSize(new Array[Byte](4)),
+ GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a"))))
+
+ expectedSizes.zip(actualSizes).foreach { case (expected, actual) =>
+ assert(expected === actual)
+ }
+ }
+
+ testNumericColumnType[BooleanType.type, Boolean](
+ BOOLEAN,
+ Array.fill(4)(Random.nextBoolean()),
+ ByteBuffer.allocate(32),
+ (buffer: ByteBuffer, v: Boolean) => {
+ buffer.put((if (v) 1 else 0).toByte)
+ },
+ (buffer: ByteBuffer) => {
+ buffer.get() == 1
+ })
+
+ testNumericColumnType[IntegerType.type, Int](
+ INT,
+ Array.fill(4)(Random.nextInt()),
+ ByteBuffer.allocate(32),
+ (_: ByteBuffer).putInt(_),
+ (_: ByteBuffer).getInt)
+
+ testNumericColumnType[ShortType.type, Short](
+ SHORT,
+ Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]),
+ ByteBuffer.allocate(32),
+ (_: ByteBuffer).putShort(_),
+ (_: ByteBuffer).getShort)
+
+ testNumericColumnType[LongType.type, Long](
+ LONG,
+ Array.fill(4)(Random.nextLong()),
+ ByteBuffer.allocate(64),
+ (_: ByteBuffer).putLong(_),
+ (_: ByteBuffer).getLong)
+
+ testNumericColumnType[ByteType.type, Byte](
+ BYTE,
+ Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]),
+ ByteBuffer.allocate(64),
+ (_: ByteBuffer).put(_),
+ (_: ByteBuffer).get)
+
+ testNumericColumnType[DoubleType.type, Double](
+ DOUBLE,
+ Array.fill(4)(Random.nextDouble()),
+ ByteBuffer.allocate(64),
+ (_: ByteBuffer).putDouble(_),
+ (_: ByteBuffer).getDouble)
+
+ testNumericColumnType[FloatType.type, Float](
+ FLOAT,
+ Array.fill(4)(Random.nextFloat()),
+ ByteBuffer.allocate(64),
+ (_: ByteBuffer).putFloat(_),
+ (_: ByteBuffer).getFloat)
+
+ test("STRING") {
+ val buffer = ByteBuffer.allocate(128)
+ val seq = Array("hello", "world", "spark", "sql")
+
+ seq.map(_.getBytes).foreach { bytes: Array[Byte] =>
+ buffer.putInt(bytes.length).put(bytes)
+ }
+
+ buffer.rewind()
+ seq.foreach { s =>
+ assert(s === STRING.extract(buffer))
+ }
+
+ buffer.rewind()
+ seq.foreach(STRING.append(_, buffer))
+
+ buffer.rewind()
+ seq.foreach { s =>
+ val length = buffer.getInt
+ assert(length === s.getBytes.length)
+
+ val bytes = new Array[Byte](length)
+ buffer.get(bytes, 0, length)
+ assert(s === new String(bytes))
+ }
+ }
+
+ test("BINARY") {
+ val buffer = ByteBuffer.allocate(128)
+ val seq = Array.fill(4) {
+ val bytes = new Array[Byte](4)
+ Random.nextBytes(bytes)
+ bytes
+ }
+
+ seq.foreach { bytes =>
+ buffer.putInt(bytes.length).put(bytes)
+ }
+
+ buffer.rewind()
+ seq.foreach { b =>
+ assert(b === BINARY.extract(buffer))
+ }
+
+ buffer.rewind()
+ seq.foreach(BINARY.append(_, buffer))
+
+ buffer.rewind()
+ seq.foreach { b =>
+ val length = buffer.getInt
+ assert(length === b.length)
+
+ val bytes = new Array[Byte](length)
+ buffer.get(bytes, 0, length)
+ assert(b === bytes)
+ }
+ }
+
+ test("GENERIC") {
+ val buffer = ByteBuffer.allocate(512)
+ val obj = Map(1 -> "spark", 2 -> "sql")
+ val serializedObj = SparkSqlSerializer.serialize(obj)
+
+ GENERIC.append(SparkSqlSerializer.serialize(obj), buffer)
+ buffer.rewind()
+
+ val length = buffer.getInt()
+ assert(length === serializedObj.length)
+
+ val bytes = new Array[Byte](length)
+ buffer.get(bytes, 0, length)
+ assert(obj === SparkSqlSerializer.deserialize(bytes))
+
+ buffer.rewind()
+ buffer.putInt(serializedObj.length).put(serializedObj)
+
+ buffer.rewind()
+ assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
+ }
+
+ def testNumericColumnType[T <: DataType, JvmType](
+ columnType: ColumnType[T, JvmType],
+ seq: Seq[JvmType],
+ buffer: ByteBuffer,
+ putter: (ByteBuffer, JvmType) => Unit,
+ getter: (ByteBuffer) => JvmType) {
+
+ val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+ test(s"$columnTypeName.extract") {
+ buffer.rewind()
+ seq.foreach(putter(buffer, _))
+
+ buffer.rewind()
+ seq.foreach { i =>
+ assert(i === columnType.extract(buffer))
+ }
+ }
+
+ test(s"$columnTypeName.append") {
+ buffer.rewind()
+ seq.foreach(columnType.append(_, buffer))
+
+ buffer.rewind()
+ seq.foreach { i =>
+ assert(i === getter(buffer))
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
new file mode 100644
index 0000000000..928851a385
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{TestData, DslQuerySuite}
+
+class ColumnarQuerySuite extends DslQuerySuite {
+ import TestData._
+ import TestSQLContext._
+
+ test("simple columnar query") {
+ val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+ checkAnswer(scan, testData.collect().toSeq)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala
new file mode 100644
index 0000000000..ddcdede8d1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+
+// TODO Enrich test data
+object ColumnarTestData {
+ object GenericMutableRow {
+ def apply(values: Any*) = {
+ val row = new GenericMutableRow(values.length)
+ row.indices.foreach { i =>
+ row(i) = values(i)
+ }
+ row
+ }
+ }
+
+ def randomBytes(length: Int) = {
+ val bytes = new Array[Byte](length)
+ Random.nextBytes(bytes)
+ bytes
+ }
+
+ val nonNullRandomRow = GenericMutableRow(
+ Random.nextInt(),
+ Random.nextLong(),
+ Random.nextFloat(),
+ Random.nextDouble(),
+ Random.nextBoolean(),
+ Random.nextInt(Byte.MaxValue).asInstanceOf[Byte],
+ Random.nextInt(Short.MaxValue).asInstanceOf[Short],
+ Random.nextString(Random.nextInt(64)),
+ randomBytes(Random.nextInt(64)),
+ Map(Random.nextInt() -> Random.nextString(4)))
+
+ val nullRow = GenericMutableRow(Seq.fill(10)(null): _*)
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
new file mode 100644
index 0000000000..279607ccfa
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+
+class NullableColumnAccessorSuite extends FunSuite {
+ import ColumnarTestData._
+
+ Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
+ testNullableColumnAccessor(_)
+ }
+
+ def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
+ val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+ test(s"$typeName accessor: empty column") {
+ val builder = ColumnBuilder(columnType.typeId, 4)
+ val accessor = ColumnAccessor(builder.build())
+ assert(!accessor.hasNext)
+ }
+
+ test(s"$typeName accessor: access null values") {
+ val builder = ColumnBuilder(columnType.typeId, 4)
+
+ (0 until 4).foreach { _ =>
+ builder.appendFrom(nonNullRandomRow, columnType.typeId)
+ builder.appendFrom(nullRow, columnType.typeId)
+ }
+
+ val accessor = ColumnAccessor(builder.build())
+ val row = new GenericMutableRow(1)
+
+ (0 until 4).foreach { _ =>
+ accessor.extractTo(row, 0)
+ assert(row(0) === nonNullRandomRow(columnType.typeId))
+
+ accessor.extractTo(row, 0)
+ assert(row(0) === null)
+ }
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
new file mode 100644
index 0000000000..3354da3fa3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+class NullableColumnBuilderSuite extends FunSuite {
+ import ColumnarTestData._
+
+ Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
+ testNullableColumnBuilder(_)
+ }
+
+ def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
+ val columnBuilder = ColumnBuilder(columnType.typeId)
+ val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+ test(s"$typeName column builder: empty column") {
+ columnBuilder.initialize(4)
+
+ val buffer = columnBuilder.build()
+
+ // For column type ID
+ assert(buffer.getInt() === columnType.typeId)
+ // For null count
+ assert(buffer.getInt === 0)
+ assert(!buffer.hasRemaining)
+ }
+
+ test(s"$typeName column builder: buffer size auto growth") {
+ columnBuilder.initialize(4)
+
+ (0 until 4) foreach { _ =>
+ columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
+ }
+
+ val buffer = columnBuilder.build()
+
+ // For column type ID
+ assert(buffer.getInt() === columnType.typeId)
+ // For null count
+ assert(buffer.getInt() === 0)
+ }
+
+ test(s"$typeName column builder: null values") {
+ columnBuilder.initialize(4)
+
+ (0 until 4) foreach { _ =>
+ columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
+ columnBuilder.appendFrom(nullRow, columnType.typeId)
+ }
+
+ val buffer = columnBuilder.build()
+
+ // For column type ID
+ assert(buffer.getInt() === columnType.typeId)
+ // For null count
+ assert(buffer.getInt() === 4)
+ // For null positions
+ (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i))
+
+ // For non-null values
+ (0 until 4).foreach { _ =>
+ val actual = if (columnType == GENERIC) {
+ SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer))
+ } else {
+ columnType.extract(buffer)
+ }
+ assert(actual === nonNullRandomRow(columnType.typeId))
+ }
+
+ assert(!buffer.hasRemaining)
+ }
+ }
+}