aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-04-07 22:24:12 -0700
committerReynold Xin <rxin@apache.org>2014-04-07 22:24:12 -0700
commit0d0493fcf7fc86d30b0ddd4e2c5a293c5c88eb9d (patch)
treee16ef05587f7916f801fb8eacec698ee5837ac2a /sql
parentf27e56aa612538188a8550fe72ee20b8b13304d7 (diff)
downloadspark-0d0493fcf7fc86d30b0ddd4e2c5a293c5c88eb9d.tar.gz
spark-0d0493fcf7fc86d30b0ddd4e2c5a293c5c88eb9d.tar.bz2
spark-0d0493fcf7fc86d30b0ddd4e2c5a293c5c88eb9d.zip
[SPARK-1402] Added 3 more compression schemes
JIRA issue: [SPARK-1402](https://issues.apache.org/jira/browse/SPARK-1402) This PR provides 3 more compression schemes for Spark SQL in-memory columnar storage: * `BooleanBitSet` * `IntDelta` * `LongDelta` Now there are 6 compression schemes in total, including the no-op `PassThrough` scheme. Also fixed a bug in PR #286: not all compression schemes are added as available schemes when accessing an in-memory column, and when a column is compressed with an unrecognised scheme, `ColumnAccessor` throws exception. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #330 from liancheng/moreCompressionSchemes and squashes the following commits: 1d037b8 [Cheng Lian] Fixed SPARK-1436: in-memory column byte buffer must be able to be accessed multiple times d7c0e8f [Cheng Lian] Added test suite for IntegralDelta (IntDelta & LongDelta) 3c1ad7a [Cheng Lian] Added test suite for BooleanBitSet, refactored other test suites 44fe4b2 [Cheng Lian] Refactored CompressionScheme, added 3 more compression schemes.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala266
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala98
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala122
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala115
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala87
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala6
11 files changed, 586 insertions, 179 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
index ffd4894b52..3c39e1d350 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -100,20 +100,21 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
private[sql] object ColumnAccessor {
def apply(buffer: ByteBuffer): ColumnAccessor = {
+ val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
// The first 4 bytes in the buffer indicate the column type.
- val columnTypeId = buffer.getInt()
+ val columnTypeId = dup.getInt()
columnTypeId match {
- case INT.typeId => new IntColumnAccessor(buffer)
- case LONG.typeId => new LongColumnAccessor(buffer)
- case FLOAT.typeId => new FloatColumnAccessor(buffer)
- case DOUBLE.typeId => new DoubleColumnAccessor(buffer)
- case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
- case BYTE.typeId => new ByteColumnAccessor(buffer)
- case SHORT.typeId => new ShortColumnAccessor(buffer)
- case STRING.typeId => new StringColumnAccessor(buffer)
- case BINARY.typeId => new BinaryColumnAccessor(buffer)
- case GENERIC.typeId => new GenericColumnAccessor(buffer)
+ case INT.typeId => new IntColumnAccessor(dup)
+ case LONG.typeId => new LongColumnAccessor(dup)
+ case FLOAT.typeId => new FloatColumnAccessor(dup)
+ case DOUBLE.typeId => new DoubleColumnAccessor(dup)
+ case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
+ case BYTE.typeId => new ByteColumnAccessor(dup)
+ case SHORT.typeId => new ShortColumnAccessor(dup)
+ case STRING.typeId => new StringColumnAccessor(dup)
+ case BINARY.typeId => new BinaryColumnAccessor(dup)
+ case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 30c6bdc791..95602d321d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -20,6 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.types._
+/**
+ * Used to collect statistical information when building in-memory columns.
+ *
+ * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]`
+ * brings significant performance penalty.
+ */
private[sql] sealed abstract class ColumnStats[T <: DataType, JvmType] extends Serializable {
/**
* Closed lower bound of this column.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 3ac4b358dd..fd3b1adf96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -47,9 +47,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
import CompressionScheme._
- val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder)
+ val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
- protected def isWorthCompressing(encoder: Encoder) = {
+ protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
}
@@ -70,7 +70,7 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
abstract override def build() = {
val rawBuffer = super.build()
- val encoder = {
+ val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index d3a4ac8df9..c605a8e443 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -22,10 +22,8 @@ import java.nio.ByteBuffer
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
-private[sql] trait Encoder {
- def gatherCompressibilityStats[T <: NativeType](
- value: T#JvmType,
- columnType: ColumnType[T, T#JvmType]) {}
+private[sql] trait Encoder[T <: NativeType] {
+ def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {}
def compressedSize: Int
@@ -35,10 +33,7 @@ private[sql] trait Encoder {
if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0
}
- def compress[T <: NativeType](
- from: ByteBuffer,
- to: ByteBuffer,
- columnType: ColumnType[T, T#JvmType]): ByteBuffer
+ def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]): ByteBuffer
}
private[sql] trait Decoder[T <: NativeType] extends Iterator[T#JvmType]
@@ -48,7 +43,7 @@ private[sql] trait CompressionScheme {
def supports(columnType: ColumnType[_, _]): Boolean
- def encoder: Encoder
+ def encoder[T <: NativeType]: Encoder[T]
def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T]
}
@@ -58,15 +53,18 @@ private[sql] trait WithCompressionSchemes {
}
private[sql] trait AllCompressionSchemes extends WithCompressionSchemes {
- override val schemes: Seq[CompressionScheme] = {
- Seq(PassThrough, RunLengthEncoding, DictionaryEncoding)
- }
+ override val schemes: Seq[CompressionScheme] = CompressionScheme.all
}
private[sql] object CompressionScheme {
- def apply(typeId: Int): CompressionScheme = typeId match {
- case PassThrough.typeId => PassThrough
- case _ => throw new UnsupportedOperationException()
+ val all: Seq[CompressionScheme] =
+ Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta)
+
+ private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap
+
+ def apply(typeId: Int): CompressionScheme = {
+ typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException(
+ s"Unrecognized compression scheme type ID: $typeId"))
}
def copyColumnHeader(from: ByteBuffer, to: ByteBuffer) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index dc2c153faf..df8220b556 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -24,7 +24,7 @@ import scala.reflect.ClassTag
import scala.reflect.runtime.universe.runtimeMirror
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.catalyst.types.NativeType
+import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.columnar._
private[sql] case object PassThrough extends CompressionScheme {
@@ -32,22 +32,18 @@ private[sql] case object PassThrough extends CompressionScheme {
override def supports(columnType: ColumnType[_, _]) = true
- override def encoder = new this.Encoder
+ override def encoder[T <: NativeType] = new this.Encoder[T]
override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer, columnType)
}
- class Encoder extends compression.Encoder {
+ class Encoder[T <: NativeType] extends compression.Encoder[T] {
override def uncompressedSize = 0
override def compressedSize = 0
- override def compress[T <: NativeType](
- from: ByteBuffer,
- to: ByteBuffer,
- columnType: ColumnType[T, T#JvmType]) = {
-
+ override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = {
// Writes compression type ID and copies raw contents
to.putInt(PassThrough.typeId).put(from).rewind()
to
@@ -64,9 +60,9 @@ private[sql] case object PassThrough extends CompressionScheme {
}
private[sql] case object RunLengthEncoding extends CompressionScheme {
- override def typeId = 1
+ override val typeId = 1
- override def encoder = new this.Encoder
+ override def encoder[T <: NativeType] = new this.Encoder[T]
override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
new this.Decoder(buffer, columnType)
@@ -77,7 +73,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
case _ => false
}
- class Encoder extends compression.Encoder {
+ class Encoder[T <: NativeType] extends compression.Encoder[T] {
private var _uncompressedSize = 0
private var _compressedSize = 0
@@ -89,10 +85,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
override def compressedSize = _compressedSize
- override def gatherCompressibilityStats[T <: NativeType](
- value: T#JvmType,
- columnType: ColumnType[T, T#JvmType]) {
-
+ override def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {
val actualSize = columnType.actualSize(value)
_uncompressedSize += actualSize
@@ -111,11 +104,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
}
}
- override def compress[T <: NativeType](
- from: ByteBuffer,
- to: ByteBuffer,
- columnType: ColumnType[T, T#JvmType]) = {
-
+ override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = {
to.putInt(RunLengthEncoding.typeId)
if (from.hasRemaining) {
@@ -172,23 +161,23 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
}
private[sql] case object DictionaryEncoding extends CompressionScheme {
- override def typeId: Int = 2
+ override val typeId = 2
// 32K unique values allowed
- private val MAX_DICT_SIZE = Short.MaxValue - 1
+ val MAX_DICT_SIZE = Short.MaxValue
override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
- new this.Decoder[T](buffer, columnType)
+ new this.Decoder(buffer, columnType)
}
- override def encoder = new this.Encoder
+ override def encoder[T <: NativeType] = new this.Encoder[T]
override def supports(columnType: ColumnType[_, _]) = columnType match {
case INT | LONG | STRING => true
case _ => false
}
- class Encoder extends compression.Encoder{
+ class Encoder[T <: NativeType] extends compression.Encoder[T] {
// Size of the input, uncompressed, in bytes. Note that we only count until the dictionary
// overflows.
private var _uncompressedSize = 0
@@ -201,7 +190,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
private var count = 0
// The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself.
- private var values = new mutable.ArrayBuffer[Any](1024)
+ private var values = new mutable.ArrayBuffer[T#JvmType](1024)
// The dictionary that maps a value to the encoded short integer.
private val dictionary = mutable.HashMap.empty[Any, Short]
@@ -210,10 +199,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
// to store dictionary element count.
private var dictionarySize = 4
- override def gatherCompressibilityStats[T <: NativeType](
- value: T#JvmType,
- columnType: ColumnType[T, T#JvmType]) {
-
+ override def gatherCompressibilityStats(value: T#JvmType, columnType: NativeColumnType[T]) {
if (!overflow) {
val actualSize = columnType.actualSize(value)
count += 1
@@ -234,11 +220,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
}
}
- override def compress[T <: NativeType](
- from: ByteBuffer,
- to: ByteBuffer,
- columnType: ColumnType[T, T#JvmType]) = {
-
+ override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[T]) = {
if (overflow) {
throw new IllegalStateException(
"Dictionary encoding should not be used because of dictionary overflow.")
@@ -249,7 +231,7 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
var i = 0
while (i < values.length) {
- columnType.append(values(i).asInstanceOf[T#JvmType], to)
+ columnType.append(values(i), to)
i += 1
}
@@ -286,3 +268,215 @@ private[sql] case object DictionaryEncoding extends CompressionScheme {
override def hasNext = buffer.hasRemaining
}
}
+
+private[sql] case object BooleanBitSet extends CompressionScheme {
+ override val typeId = 3
+
+ val BITS_PER_LONG = 64
+
+ override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+ new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]]
+ }
+
+ override def encoder[T <: NativeType] = (new this.Encoder).asInstanceOf[compression.Encoder[T]]
+
+ override def supports(columnType: ColumnType[_, _]) = columnType == BOOLEAN
+
+ class Encoder extends compression.Encoder[BooleanType.type] {
+ private var _uncompressedSize = 0
+
+ override def gatherCompressibilityStats(
+ value: Boolean,
+ columnType: NativeColumnType[BooleanType.type]) {
+
+ _uncompressedSize += BOOLEAN.defaultSize
+ }
+
+ override def compress(
+ from: ByteBuffer,
+ to: ByteBuffer,
+ columnType: NativeColumnType[BooleanType.type]) = {
+
+ to.putInt(BooleanBitSet.typeId)
+ // Total element count (1 byte per Boolean value)
+ .putInt(from.remaining)
+
+ while (from.remaining >= BITS_PER_LONG) {
+ var word = 0: Long
+ var i = 0
+
+ while (i < BITS_PER_LONG) {
+ if (BOOLEAN.extract(from)) {
+ word |= (1: Long) << i
+ }
+ i += 1
+ }
+
+ to.putLong(word)
+ }
+
+ if (from.hasRemaining) {
+ var word = 0: Long
+ var i = 0
+
+ while (from.hasRemaining) {
+ if (BOOLEAN.extract(from)) {
+ word |= (1: Long) << i
+ }
+ i += 1
+ }
+
+ to.putLong(word)
+ }
+
+ to.rewind()
+ to
+ }
+
+ override def uncompressedSize = _uncompressedSize
+
+ override def compressedSize = {
+ val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1
+ (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4
+ }
+ }
+
+ class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] {
+ private val count = buffer.getInt()
+
+ private var currentWord = 0: Long
+
+ private var visited: Int = 0
+
+ override def next(): Boolean = {
+ val bit = visited % BITS_PER_LONG
+
+ visited += 1
+ if (bit == 0) {
+ currentWord = buffer.getLong()
+ }
+
+ ((currentWord >> bit) & 1) != 0
+ }
+
+ override def hasNext: Boolean = visited < count
+ }
+}
+
+private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends CompressionScheme {
+ override def decoder[T <: NativeType](buffer: ByteBuffer, columnType: NativeColumnType[T]) = {
+ new this.Decoder(buffer, columnType.asInstanceOf[NativeColumnType[I]])
+ .asInstanceOf[compression.Decoder[T]]
+ }
+
+ override def encoder[T <: NativeType] = (new this.Encoder).asInstanceOf[compression.Encoder[T]]
+
+ /**
+ * Computes `delta = x - y`, returns `(true, delta)` if `delta` can fit into a single byte, or
+ * `(false, 0: Byte)` otherwise.
+ */
+ protected def byteSizedDelta(x: I#JvmType, y: I#JvmType): (Boolean, Byte)
+
+ /**
+ * Simply computes `x + delta`
+ */
+ protected def addDelta(x: I#JvmType, delta: Byte): I#JvmType
+
+ class Encoder extends compression.Encoder[I] {
+ private var _compressedSize: Int = 0
+
+ private var _uncompressedSize: Int = 0
+
+ private var prev: I#JvmType = _
+
+ private var initial = true
+
+ override def gatherCompressibilityStats(value: I#JvmType, columnType: NativeColumnType[I]) {
+ _uncompressedSize += columnType.defaultSize
+
+ if (initial) {
+ initial = false
+ prev = value
+ _compressedSize += 1 + columnType.defaultSize
+ } else {
+ val (smallEnough, _) = byteSizedDelta(value, prev)
+ _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
+ }
+ }
+
+ override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
+ to.putInt(typeId)
+
+ if (from.hasRemaining) {
+ val prev = columnType.extract(from)
+
+ to.put(Byte.MinValue)
+ columnType.append(prev, to)
+
+ while (from.hasRemaining) {
+ val current = columnType.extract(from)
+ val (smallEnough, delta) = byteSizedDelta(current, prev)
+
+ if (smallEnough) {
+ to.put(delta)
+ } else {
+ to.put(Byte.MinValue)
+ columnType.append(current, to)
+ }
+ }
+ }
+
+ to.rewind()
+ to
+ }
+
+ override def uncompressedSize = _uncompressedSize
+
+ override def compressedSize = _compressedSize
+ }
+
+ class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[I])
+ extends compression.Decoder[I] {
+
+ private var prev: I#JvmType = _
+
+ override def next() = {
+ val delta = buffer.get()
+
+ if (delta > Byte.MinValue) {
+ addDelta(prev, delta)
+ } else {
+ prev = columnType.extract(buffer)
+ prev
+ }
+ }
+
+ override def hasNext = buffer.hasRemaining
+ }
+}
+
+private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {
+ override val typeId = 4
+
+ override def supports(columnType: ColumnType[_, _]) = columnType == INT
+
+ override protected def addDelta(x: Int, delta: Byte) = x + delta
+
+ override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
+ val delta = x - y
+ if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+ }
+}
+
+private[sql] case object LongDelta extends IntegralDelta[LongType.type] {
+ override val typeId = 5
+
+ override def supports(columnType: ColumnType[_, _]) = columnType == LONG
+
+ override protected def addDelta(x: Long, delta: Byte) = x + delta
+
+ override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
+ val delta = x - y
+ if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
index 70b2e85173..2ed4cf2170 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
@@ -31,4 +31,12 @@ class ColumnarQuerySuite extends QueryTest {
checkAnswer(scan, testData.collect().toSeq)
}
+
+ test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
+ val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+ checkAnswer(scan, testData.collect().toSeq)
+ checkAnswer(scan, testData.collect().toSeq)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
new file mode 100644
index 0000000000..a754f98f7f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar.compression
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.columnar.{BOOLEAN, BooleanColumnStats}
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
+
+class BooleanBitSetSuite extends FunSuite {
+ import BooleanBitSet._
+
+ def skeleton(count: Int) {
+ // -------------
+ // Tests encoder
+ // -------------
+
+ val builder = TestCompressibleColumnBuilder(new BooleanColumnStats, BOOLEAN, BooleanBitSet)
+ val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN))
+ val values = rows.map(_.head)
+
+ rows.foreach(builder.appendFrom(_, 0))
+ val buffer = builder.build()
+
+ // Column type ID + null count + null positions
+ val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+ // Compression scheme ID + element count + bitset words
+ val compressedSize = 4 + 4 + {
+ val extra = if (count % BITS_PER_LONG == 0) 0 else 1
+ (count / BITS_PER_LONG + extra) * 8
+ }
+
+ // 4 extra bytes for compression scheme type ID
+ expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
+
+ // Skips column header
+ buffer.position(headerSize)
+ expectResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt())
+ expectResult(count, "Wrong element count")(buffer.getInt())
+
+ var word = 0: Long
+ for (i <- 0 until count) {
+ val bit = i % BITS_PER_LONG
+ word = if (bit == 0) buffer.getLong() else word
+ expectResult(values(i), s"Wrong value in compressed buffer, index=$i") {
+ (word & ((1: Long) << bit)) != 0
+ }
+ }
+
+ // -------------
+ // Tests decoder
+ // -------------
+
+ // Rewinds, skips column header and 4 more bytes for compression scheme ID
+ buffer.rewind().position(headerSize + 4)
+
+ val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
+ values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+ assert(!decoder.hasNext)
+ }
+
+ test(s"$BooleanBitSet: empty") {
+ skeleton(0)
+ }
+
+ test(s"$BooleanBitSet: less than 1 word") {
+ skeleton(BITS_PER_LONG - 1)
+ }
+
+ test(s"$BooleanBitSet: exactly 1 word") {
+ skeleton(BITS_PER_LONG)
+ }
+
+ test(s"$BooleanBitSet: multiple whole words") {
+ skeleton(BITS_PER_LONG * 2)
+ }
+
+ test(s"$BooleanBitSet: multiple words and 1 more bit") {
+ skeleton(BITS_PER_LONG * 2 + 1)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
index 184691ab5b..eab27987e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
@@ -24,7 +24,6 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar._
import org.apache.spark.sql.columnar.ColumnarTestUtils._
-import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
class DictionaryEncodingSuite extends FunSuite {
testDictionaryEncoding(new IntColumnStats, INT)
@@ -41,73 +40,82 @@ class DictionaryEncodingSuite extends FunSuite {
(0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap
}
- test(s"$DictionaryEncoding with $typeName: simple case") {
+ def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) {
+ Seq.empty
+ } else {
+ seq.head +: seq.tail.filterNot(_ == seq.head)
+ }
+
+ def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) {
// -------------
// Tests encoder
// -------------
val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding)
- val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
-
- builder.initialize(0)
- builder.appendFrom(rows(0), 0)
- builder.appendFrom(rows(1), 0)
- builder.appendFrom(rows(0), 0)
- builder.appendFrom(rows(1), 0)
-
- val buffer = builder.build()
- val headerSize = CompressionScheme.columnHeaderSize(buffer)
- // 4 extra bytes for dictionary size
- val dictionarySize = 4 + values.map(columnType.actualSize).sum
- // 4 `Short`s, 2 bytes each
- val compressedSize = dictionarySize + 2 * 4
- // 4 extra bytes for compression scheme type ID
- expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
-
- // Skips column header
- buffer.position(headerSize)
- expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
-
- val dictionary = buildDictionary(buffer)
- Array[Short](0, 1).foreach { i =>
- expectResult(i, "Wrong dictionary entry")(dictionary(values(i)))
- }
-
- Array[Short](0, 1, 0, 1).foreach {
- expectResult(_, "Wrong column element value")(buffer.getShort())
+ val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
+ val dictValues = stableDistinct(inputSeq)
+
+ inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
+
+ if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) {
+ withClue("Dictionary overflowed, compression should fail") {
+ intercept[Throwable] {
+ builder.build()
+ }
+ }
+ } else {
+ val buffer = builder.build()
+ val headerSize = CompressionScheme.columnHeaderSize(buffer)
+ // 4 extra bytes for dictionary size
+ val dictionarySize = 4 + values.map(columnType.actualSize).sum
+ // 2 bytes for each `Short`
+ val compressedSize = 4 + dictionarySize + 2 * inputSeq.length
+ // 4 extra bytes for compression scheme type ID
+ expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
+
+ // Skips column header
+ buffer.position(headerSize)
+ expectResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
+
+ val dictionary = buildDictionary(buffer).toMap
+
+ dictValues.foreach { i =>
+ expectResult(i, "Wrong dictionary entry") {
+ dictionary(values(i))
+ }
+ }
+
+ inputSeq.foreach { i =>
+ expectResult(i.toShort, "Wrong column element value")(buffer.getShort())
+ }
+
+ // -------------
+ // Tests decoder
+ // -------------
+
+ // Rewinds, skips column header and 4 more bytes for compression scheme ID
+ buffer.rewind().position(headerSize + 4)
+
+ val decoder = DictionaryEncoding.decoder(buffer, columnType)
+
+ inputSeq.foreach { i =>
+ expectResult(values(i), "Wrong decoded value")(decoder.next())
+ }
+
+ assert(!decoder.hasNext)
}
-
- // -------------
- // Tests decoder
- // -------------
-
- // Rewinds, skips column header and 4 more bytes for compression scheme ID
- buffer.rewind().position(headerSize + 4)
-
- val decoder = new DictionaryEncoding.Decoder[T](buffer, columnType)
-
- Array[Short](0, 1, 0, 1).foreach { i =>
- expectResult(values(i), "Wrong decoded value")(decoder.next())
- }
-
- assert(!decoder.hasNext)
}
- }
- test(s"$DictionaryEncoding: overflow") {
- val builder = TestCompressibleColumnBuilder(new IntColumnStats, INT, DictionaryEncoding)
- builder.initialize(0)
+ test(s"$DictionaryEncoding with $typeName: empty") {
+ skeleton(0, Seq.empty)
+ }
- (0 to Short.MaxValue).foreach { n =>
- val row = new GenericMutableRow(1)
- row.setInt(0, n)
- builder.appendFrom(row, 0)
+ test(s"$DictionaryEncoding with $typeName: simple case") {
+ skeleton(2, Seq(0, 1, 0, 1))
}
- withClue("Dictionary overflowed, encoding should fail") {
- intercept[Throwable] {
- builder.build()
- }
+ test(s"$DictionaryEncoding with $typeName: dictionary overflow") {
+ skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
new file mode 100644
index 0000000000..1390e5eef6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar.compression
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+import org.apache.spark.sql.catalyst.types.IntegralType
+import org.apache.spark.sql.columnar._
+
+class IntegralDeltaSuite extends FunSuite {
+ testIntegralDelta(new IntColumnStats, INT, IntDelta)
+ testIntegralDelta(new LongColumnStats, LONG, LongDelta)
+
+ def testIntegralDelta[I <: IntegralType](
+ columnStats: NativeColumnStats[I],
+ columnType: NativeColumnType[I],
+ scheme: IntegralDelta[I]) {
+
+ def skeleton(input: Seq[I#JvmType]) {
+ // -------------
+ // Tests encoder
+ // -------------
+
+ val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme)
+ val deltas = if (input.isEmpty) {
+ Seq.empty[Long]
+ } else {
+ (input.tail, input.init).zipped.map {
+ case (x: Int, y: Int) => (x - y).toLong
+ case (x: Long, y: Long) => x - y
+ }
+ }
+
+ input.map { value =>
+ val row = new GenericMutableRow(1)
+ columnType.setField(row, 0, value)
+ builder.appendFrom(row, 0)
+ }
+
+ val buffer = builder.build()
+ // Column type ID + null count + null positions
+ val headerSize = CompressionScheme.columnHeaderSize(buffer)
+
+ // Compression scheme ID + compressed contents
+ val compressedSize = 4 + (if (deltas.isEmpty) {
+ 0
+ } else {
+ val oneBoolean = columnType.defaultSize
+ 1 + oneBoolean + deltas.map {
+ d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
+ }.sum
+ })
+
+ // 4 extra bytes for compression scheme type ID
+ expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
+
+ buffer.position(headerSize)
+ expectResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt())
+
+ if (input.nonEmpty) {
+ expectResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get())
+ expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))
+
+ (input.tail, deltas).zipped.foreach { (value, delta) =>
+ if (delta < Byte.MaxValue) {
+ expectResult(delta, "Wrong delta")(buffer.get())
+ } else {
+ expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
+ expectResult(value, "Wrong value")(columnType.extract(buffer))
+ }
+ }
+ }
+
+ // -------------
+ // Tests decoder
+ // -------------
+
+ // Rewinds, skips column header and 4 more bytes for compression scheme ID
+ buffer.rewind().position(headerSize + 4)
+
+ val decoder = scheme.decoder(buffer, columnType)
+ input.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+ assert(!decoder.hasNext)
+ }
+
+ test(s"$scheme: empty column") {
+ skeleton(Seq.empty)
+ }
+
+ test(s"$scheme: simple case") {
+ val input = columnType match {
+ case INT => Seq(1: Int, 2: Int, 130: Int)
+ case LONG => Seq(1: Long, 2: Long, 130: Long)
+ }
+
+ skeleton(input.map(_.asInstanceOf[I#JvmType]))
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
index 2089ad120d..89f9b60a43 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
@@ -37,34 +37,39 @@ class RunLengthEncodingSuite extends FunSuite {
val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
- test(s"$RunLengthEncoding with $typeName: simple case") {
+ def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) {
// -------------
// Tests encoder
// -------------
val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding)
- val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
-
- builder.initialize(0)
- builder.appendFrom(rows(0), 0)
- builder.appendFrom(rows(0), 0)
- builder.appendFrom(rows(1), 0)
- builder.appendFrom(rows(1), 0)
+ val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount)
+ val inputSeq = inputRuns.flatMap { case (index, run) =>
+ Seq.fill(run)(index)
+ }
+ inputSeq.foreach(i => builder.appendFrom(rows(i), 0))
val buffer = builder.build()
+
+ // Column type ID + null count + null positions
val headerSize = CompressionScheme.columnHeaderSize(buffer)
- // 4 extra bytes each run for run length
- val compressedSize = values.map(columnType.actualSize(_) + 4).sum
+
+ // Compression scheme ID + compressed contents
+ val compressedSize = 4 + inputRuns.map { case (index, _) =>
+ // 4 extra bytes each run for run length
+ columnType.actualSize(values(index)) + 4
+ }.sum
+
// 4 extra bytes for compression scheme type ID
- expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
+ expectResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity)
// Skips column header
buffer.position(headerSize)
expectResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
- Array(0, 1).foreach { i =>
- expectResult(values(i), "Wrong column element value")(columnType.extract(buffer))
- expectResult(2, "Wrong run length")(buffer.getInt())
+ inputRuns.foreach { case (index, run) =>
+ expectResult(values(index), "Wrong column element value")(columnType.extract(buffer))
+ expectResult(run, "Wrong run length")(buffer.getInt())
}
// -------------
@@ -74,57 +79,29 @@ class RunLengthEncodingSuite extends FunSuite {
// Rewinds, skips column header and 4 more bytes for compression scheme ID
buffer.rewind().position(headerSize + 4)
- val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
+ val decoder = RunLengthEncoding.decoder(buffer, columnType)
- Array(0, 0, 1, 1).foreach { i =>
+ inputSeq.foreach { i =>
expectResult(values(i), "Wrong decoded value")(decoder.next())
}
assert(!decoder.hasNext)
}
- test(s"$RunLengthEncoding with $typeName: run length == 1") {
- // -------------
- // Tests encoder
- // -------------
-
- val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding)
- val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, 2)
-
- builder.initialize(0)
- builder.appendFrom(rows(0), 0)
- builder.appendFrom(rows(1), 0)
-
- val buffer = builder.build()
- val headerSize = CompressionScheme.columnHeaderSize(buffer)
- // 4 bytes each run for run length
- val compressedSize = values.map(columnType.actualSize(_) + 4).sum
- // 4 bytes for compression scheme type ID
- expectResult(headerSize + 4 + compressedSize, "Wrong buffer capacity")(buffer.capacity)
-
- // Skips column header
- buffer.position(headerSize)
- expectResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt())
-
- Array(0, 1).foreach { i =>
- expectResult(values(i), "Wrong column element value")(columnType.extract(buffer))
- expectResult(1, "Wrong run length")(buffer.getInt())
- }
-
- // -------------
- // Tests decoder
- // -------------
-
- // Rewinds, skips column header and 4 more bytes for compression scheme ID
- buffer.rewind().position(headerSize + 4)
+ test(s"$RunLengthEncoding with $typeName: empty column") {
+ skeleton(0, Seq.empty)
+ }
- val decoder = new RunLengthEncoding.Decoder[T](buffer, columnType)
+ test(s"$RunLengthEncoding with $typeName: simple case") {
+ skeleton(2, Seq(0 -> 2, 1 ->2))
+ }
- Array(0, 1).foreach { i =>
- expectResult(values(i), "Wrong decoded value")(decoder.next())
- }
+ test(s"$RunLengthEncoding with $typeName: run length == 1") {
+ skeleton(2, Seq(0 -> 1, 1 ->1))
+ }
- assert(!decoder.hasNext)
+ test(s"$RunLengthEncoding with $typeName: single long run") {
+ skeleton(1, Seq(0 -> 1000))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
index e0ec812863..81bf5e99d1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -28,7 +28,7 @@ class TestCompressibleColumnBuilder[T <: NativeType](
with NullableColumnBuilder
with CompressibleColumnBuilder[T] {
- override protected def isWorthCompressing(encoder: Encoder) = true
+ override protected def isWorthCompressing(encoder: Encoder[T]) = true
}
object TestCompressibleColumnBuilder {
@@ -37,7 +37,9 @@ object TestCompressibleColumnBuilder {
columnType: NativeColumnType[T],
scheme: CompressionScheme) = {
- new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
+ val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
+ builder.initialize(0)
+ builder
}
}