From 014c0f7a9dfdb1686fa9aeacaadb2a17a855a943 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 19 Nov 2015 14:48:18 -0800 Subject: [SPARK-11858][SQL] Move sql.columnar into sql.execution. In addition, tightened visibility of a lot of classes in the columnar package from private[sql] to private[columnar]. Author: Reynold Xin Closes #9842 from rxin/SPARK-11858. --- .../apache/spark/sql/columnar/ColumnAccessor.scala | 148 ----- .../apache/spark/sql/columnar/ColumnBuilder.scala | 189 ------ .../apache/spark/sql/columnar/ColumnStats.scala | 271 -------- .../org/apache/spark/sql/columnar/ColumnType.scala | 689 --------------------- .../sql/columnar/GenerateColumnAccessor.scala | 195 ------ .../sql/columnar/InMemoryColumnarTableScan.scala | 345 ----------- .../sql/columnar/NullableColumnAccessor.scala | 59 -- .../spark/sql/columnar/NullableColumnBuilder.scala | 88 --- .../compression/CompressibleColumnAccessor.scala | 39 -- .../compression/CompressibleColumnBuilder.scala | 109 ---- .../columnar/compression/CompressionScheme.scala | 81 --- .../columnar/compression/compressionSchemes.scala | 532 ---------------- .../apache/spark/sql/execution/CacheManager.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- .../sql/execution/columnar/ColumnAccessor.scala | 148 +++++ .../sql/execution/columnar/ColumnBuilder.scala | 194 ++++++ .../spark/sql/execution/columnar/ColumnStats.scala | 271 ++++++++ .../spark/sql/execution/columnar/ColumnType.scala | 689 +++++++++++++++++++++ .../columnar/GenerateColumnAccessor.scala | 195 ++++++ .../columnar/InMemoryColumnarTableScan.scala | 346 +++++++++++ .../columnar/NullableColumnAccessor.scala | 59 ++ .../execution/columnar/NullableColumnBuilder.scala | 88 +++ .../compression/CompressibleColumnAccessor.scala | 39 ++ .../compression/CompressibleColumnBuilder.scala | 109 ++++ .../columnar/compression/CompressionScheme.scala | 81 +++ .../columnar/compression/compressionSchemes.scala | 532 ++++++++++++++++ .../org/apache/spark/sql/execution/package.scala | 2 + .../org/apache/spark/sql/CachedTableSuite.scala | 4 +- .../scala/org/apache/spark/sql/QueryTest.scala | 2 +- .../spark/sql/columnar/ColumnStatsSuite.scala | 110 ---- .../spark/sql/columnar/ColumnTypeSuite.scala | 145 ----- .../spark/sql/columnar/ColumnarTestUtils.scala | 108 ---- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 222 ------- .../sql/columnar/NullableColumnAccessorSuite.scala | 92 --- .../sql/columnar/NullableColumnBuilderSuite.scala | 107 ---- .../sql/columnar/PartitionBatchPruningSuite.scala | 127 ---- .../columnar/compression/BooleanBitSetSuite.scala | 107 ---- .../compression/DictionaryEncodingSuite.scala | 128 ---- .../columnar/compression/IntegralDeltaSuite.scala | 131 ---- .../compression/RunLengthEncodingSuite.scala | 114 ---- .../TestCompressibleColumnBuilder.scala | 44 -- .../sql/execution/columnar/ColumnStatsSuite.scala | 110 ++++ .../sql/execution/columnar/ColumnTypeSuite.scala | 145 +++++ .../sql/execution/columnar/ColumnarTestUtils.scala | 108 ++++ .../columnar/InMemoryColumnarQuerySuite.scala | 222 +++++++ .../columnar/NullableColumnAccessorSuite.scala | 92 +++ .../columnar/NullableColumnBuilderSuite.scala | 107 ++++ .../columnar/PartitionBatchPruningSuite.scala | 127 ++++ .../columnar/compression/BooleanBitSetSuite.scala | 107 ++++ .../compression/DictionaryEncodingSuite.scala | 128 ++++ .../columnar/compression/IntegralDeltaSuite.scala | 131 ++++ .../compression/RunLengthEncodingSuite.scala | 114 ++++ .../TestCompressibleColumnBuilder.scala | 44 ++ .../apache/spark/sql/hive/CachedTableSuite.scala | 2 +- 54 files changed, 4194 insertions(+), 4186 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala (limited to 'sql') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala deleted file mode 100644 index 42ec4d3433..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow} -import org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor -import org.apache.spark.sql.types._ - -/** - * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is - * extracted from the buffer, instead of directly returning it, the value is set into some field of - * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods - * for primitive values provided by [[MutableRow]]. - */ -private[sql] trait ColumnAccessor { - initialize() - - protected def initialize() - - def hasNext: Boolean - - def extractTo(row: MutableRow, ordinal: Int) - - protected def underlyingBuffer: ByteBuffer -} - -private[sql] abstract class BasicColumnAccessor[JvmType]( - protected val buffer: ByteBuffer, - protected val columnType: ColumnType[JvmType]) - extends ColumnAccessor { - - protected def initialize() {} - - override def hasNext: Boolean = buffer.hasRemaining - - override def extractTo(row: MutableRow, ordinal: Int): Unit = { - extractSingle(row, ordinal) - } - - def extractSingle(row: MutableRow, ordinal: Int): Unit = { - columnType.extract(buffer, row, ordinal) - } - - protected def underlyingBuffer = buffer -} - -private[sql] class NullColumnAccessor(buffer: ByteBuffer) - extends BasicColumnAccessor[Any](buffer, NULL) - with NullableColumnAccessor - -private[sql] abstract class NativeColumnAccessor[T <: AtomicType]( - override protected val buffer: ByteBuffer, - override protected val columnType: NativeColumnType[T]) - extends BasicColumnAccessor(buffer, columnType) - with NullableColumnAccessor - with CompressibleColumnAccessor[T] - -private[sql] class BooleanColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, BOOLEAN) - -private[sql] class ByteColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, BYTE) - -private[sql] class ShortColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, SHORT) - -private[sql] class IntColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, INT) - -private[sql] class LongColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, LONG) - -private[sql] class FloatColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, FLOAT) - -private[sql] class DoubleColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, DOUBLE) - -private[sql] class StringColumnAccessor(buffer: ByteBuffer) - extends NativeColumnAccessor(buffer, STRING) - -private[sql] class BinaryColumnAccessor(buffer: ByteBuffer) - extends BasicColumnAccessor[Array[Byte]](buffer, BINARY) - with NullableColumnAccessor - -private[sql] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) - extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType)) - -private[sql] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) - extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType)) - with NullableColumnAccessor - -private[sql] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType) - extends BasicColumnAccessor[UnsafeRow](buffer, STRUCT(dataType)) - with NullableColumnAccessor - -private[sql] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType) - extends BasicColumnAccessor[UnsafeArrayData](buffer, ARRAY(dataType)) - with NullableColumnAccessor - -private[sql] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType) - extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType)) - with NullableColumnAccessor - -private[sql] object ColumnAccessor { - def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = { - val buf = buffer.order(ByteOrder.nativeOrder) - - dataType match { - case NullType => new NullColumnAccessor(buf) - case BooleanType => new BooleanColumnAccessor(buf) - case ByteType => new ByteColumnAccessor(buf) - case ShortType => new ShortColumnAccessor(buf) - case IntegerType | DateType => new IntColumnAccessor(buf) - case LongType | TimestampType => new LongColumnAccessor(buf) - case FloatType => new FloatColumnAccessor(buf) - case DoubleType => new DoubleColumnAccessor(buf) - case StringType => new StringColumnAccessor(buf) - case BinaryType => new BinaryColumnAccessor(buf) - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - new CompactDecimalColumnAccessor(buf, dt) - case dt: DecimalType => new DecimalColumnAccessor(buf, dt) - case struct: StructType => new StructColumnAccessor(buf, struct) - case array: ArrayType => new ArrayColumnAccessor(buf, array) - case map: MapType => new MapColumnAccessor(buf, map) - case udt: UserDefinedType[_] => ColumnAccessor(udt.sqlType, buffer) - case other => - throw new Exception(s"not support type: $other") - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala deleted file mode 100644 index 599f30f2d7..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.columnar.ColumnBuilder._ -import org.apache.spark.sql.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} -import org.apache.spark.sql.types._ - -private[sql] trait ColumnBuilder { - /** - * Initializes with an approximate lower bound on the expected number of elements in this column. - */ - def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false) - - /** - * Appends `row(ordinal)` to the column builder. - */ - def appendFrom(row: InternalRow, ordinal: Int) - - /** - * Column statistics information - */ - def columnStats: ColumnStats - - /** - * Returns the final columnar byte buffer. - */ - def build(): ByteBuffer -} - -private[sql] class BasicColumnBuilder[JvmType]( - val columnStats: ColumnStats, - val columnType: ColumnType[JvmType]) - extends ColumnBuilder { - - protected var columnName: String = _ - - protected var buffer: ByteBuffer = _ - - override def initialize( - initialSize: Int, - columnName: String = "", - useCompression: Boolean = false): Unit = { - - val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize - this.columnName = columnName - - buffer = ByteBuffer.allocate(size * columnType.defaultSize) - buffer.order(ByteOrder.nativeOrder()) - } - - override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal)) - columnType.append(row, ordinal, buffer) - } - - override def build(): ByteBuffer = { - if (buffer.capacity() > buffer.position() * 1.1) { - // trim the buffer - buffer = ByteBuffer - .allocate(buffer.position()) - .order(ByteOrder.nativeOrder()) - .put(buffer.array(), 0, buffer.position()) - } - buffer.flip().asInstanceOf[ByteBuffer] - } -} - -private[sql] class NullColumnBuilder - extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL) - with NullableColumnBuilder - -private[sql] abstract class ComplexColumnBuilder[JvmType]( - columnStats: ColumnStats, - columnType: ColumnType[JvmType]) - extends BasicColumnBuilder[JvmType](columnStats, columnType) - with NullableColumnBuilder - -private[sql] abstract class NativeColumnBuilder[T <: AtomicType]( - override val columnStats: ColumnStats, - override val columnType: NativeColumnType[T]) - extends BasicColumnBuilder[T#InternalType](columnStats, columnType) - with NullableColumnBuilder - with AllCompressionSchemes - with CompressibleColumnBuilder[T] - -private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN) - -private[sql] class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE) - -private[sql] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT) - -private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT) - -private[sql] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG) - -private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT) - -private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE) - -private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING) - -private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY) - -private[sql] class CompactDecimalColumnBuilder(dataType: DecimalType) - extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType)) - -private[sql] class DecimalColumnBuilder(dataType: DecimalType) - extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType)) - -private[sql] class StructColumnBuilder(dataType: StructType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType)) - -private[sql] class ArrayColumnBuilder(dataType: ArrayType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType)) - -private[sql] class MapColumnBuilder(dataType: MapType) - extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType)) - -private[sql] object ColumnBuilder { - val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024 - val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L - - private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { - if (orig.remaining >= size) { - orig - } else { - // grow in steps of initial size - val capacity = orig.capacity() - val newSize = capacity + size.max(capacity) - val pos = orig.position() - - ByteBuffer - .allocate(newSize) - .order(ByteOrder.nativeOrder()) - .put(orig.array(), 0, pos) - } - } - - def apply( - dataType: DataType, - initialSize: Int = 0, - columnName: String = "", - useCompression: Boolean = false): ColumnBuilder = { - val builder: ColumnBuilder = dataType match { - case NullType => new NullColumnBuilder - case BooleanType => new BooleanColumnBuilder - case ByteType => new ByteColumnBuilder - case ShortType => new ShortColumnBuilder - case IntegerType | DateType => new IntColumnBuilder - case LongType | TimestampType => new LongColumnBuilder - case FloatType => new FloatColumnBuilder - case DoubleType => new DoubleColumnBuilder - case StringType => new StringColumnBuilder - case BinaryType => new BinaryColumnBuilder - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - new CompactDecimalColumnBuilder(dt) - case dt: DecimalType => new DecimalColumnBuilder(dt) - case struct: StructType => new StructColumnBuilder(struct) - case array: ArrayType => new ArrayColumnBuilder(array) - case map: MapType => new MapColumnBuilder(map) - case udt: UserDefinedType[_] => - return apply(udt.sqlType, initialSize, columnName, useCompression) - case other => - throw new Exception(s"not suppported type: $other") - } - - builder.initialize(initialSize, columnName, useCompression) - builder - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala deleted file mode 100644 index 91a0565058..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ /dev/null @@ -1,271 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Attribute, AttributeMap, AttributeReference} -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String - -private[sql] class ColumnStatisticsSchema(a: Attribute) extends Serializable { - val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)() - val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)() - val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)() - val count = AttributeReference(a.name + ".count", IntegerType, nullable = false)() - val sizeInBytes = AttributeReference(a.name + ".sizeInBytes", LongType, nullable = false)() - - val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes) -} - -private[sql] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable { - val (forAttribute, schema) = { - val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a)) - (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _)) - } -} - -/** - * Used to collect statistical information when building in-memory columns. - * - * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]` - * brings significant performance penalty. - */ -private[sql] sealed trait ColumnStats extends Serializable { - protected var count = 0 - protected var nullCount = 0 - private[sql] var sizeInBytes = 0L - - /** - * Gathers statistics information from `row(ordinal)`. - */ - def gatherStats(row: InternalRow, ordinal: Int): Unit = { - if (row.isNullAt(ordinal)) { - nullCount += 1 - // 4 bytes for null position - sizeInBytes += 4 - } - count += 1 - } - - /** - * Column statistics represented as a single row, currently including closed lower bound, closed - * upper bound and null count. - */ - def collectedStatistics: GenericInternalRow -} - -/** - * A no-op ColumnStats only used for testing purposes. - */ -private[sql] class NoopColumnStats extends ColumnStats { - override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal) - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L)) -} - -private[sql] class BooleanColumnStats extends ColumnStats { - protected var upper = false - protected var lower = true - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getBoolean(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += BOOLEAN.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ByteColumnStats extends ColumnStats { - protected var upper = Byte.MinValue - protected var lower = Byte.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getByte(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += BYTE.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ShortColumnStats extends ColumnStats { - protected var upper = Short.MinValue - protected var lower = Short.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getShort(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += SHORT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class IntColumnStats extends ColumnStats { - protected var upper = Int.MinValue - protected var lower = Int.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getInt(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += INT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class LongColumnStats extends ColumnStats { - protected var upper = Long.MinValue - protected var lower = Long.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getLong(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += LONG.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class FloatColumnStats extends ColumnStats { - protected var upper = Float.MinValue - protected var lower = Float.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getFloat(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += FLOAT.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class DoubleColumnStats extends ColumnStats { - protected var upper = Double.MinValue - protected var lower = Double.MaxValue - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getDouble(ordinal) - if (value > upper) upper = value - if (value < lower) lower = value - sizeInBytes += DOUBLE.defaultSize - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class StringColumnStats extends ColumnStats { - protected var upper: UTF8String = null - protected var lower: UTF8String = null - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getUTF8String(ordinal) - if (upper == null || value.compareTo(upper) > 0) upper = value.clone() - if (lower == null || value.compareTo(lower) < 0) lower = value.clone() - sizeInBytes += STRING.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class BinaryColumnStats extends ColumnStats { - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - sizeInBytes += BINARY.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) -} - -private[sql] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats { - def this(dt: DecimalType) = this(dt.precision, dt.scale) - - protected var upper: Decimal = null - protected var lower: Decimal = null - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - val value = row.getDecimal(ordinal, precision, scale) - if (upper == null || value.compareTo(upper) > 0) upper = value - if (lower == null || value.compareTo(lower) < 0) lower = value - // TODO: this is not right for DecimalType with precision > 18 - sizeInBytes += 8 - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) -} - -private[sql] class ObjectColumnStats(dataType: DataType) extends ColumnStats { - val columnType = ColumnType(dataType) - - override def gatherStats(row: InternalRow, ordinal: Int): Unit = { - super.gatherStats(row, ordinal) - if (!row.isNullAt(ordinal)) { - sizeInBytes += columnType.actualSize(row, ordinal) - } - } - - override def collectedStatistics: GenericInternalRow = - new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala deleted file mode 100644 index 68e509eb50..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ /dev/null @@ -1,689 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.math.{BigDecimal, BigInteger} -import java.nio.ByteBuffer - -import scala.reflect.runtime.universe.TypeTag - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.types.UTF8String - - -/** - * A help class for fast reading Int/Long/Float/Double from ByteBuffer in native order. - * - * Note: There is not much difference between ByteBuffer.getByte/getShort and - * Unsafe.getByte/getShort, so we do not have helper methods for them. - * - * The unrolling (building columnar cache) is already slow, putLong/putDouble will not help much, - * so we do not have helper methods for them. - * - * - * WARNNING: This only works with HeapByteBuffer - */ -object ByteBufferHelper { - def getInt(buffer: ByteBuffer): Int = { - val pos = buffer.position() - buffer.position(pos + 4) - Platform.getInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getLong(buffer: ByteBuffer): Long = { - val pos = buffer.position() - buffer.position(pos + 8) - Platform.getLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getFloat(buffer: ByteBuffer): Float = { - val pos = buffer.position() - buffer.position(pos + 4) - Platform.getFloat(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } - - def getDouble(buffer: ByteBuffer): Double = { - val pos = buffer.position() - buffer.position(pos + 8) - Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) - } -} - -/** - * An abstract class that represents type of a column. Used to append/extract Java objects into/from - * the underlying [[ByteBuffer]] of a column. - * - * @tparam JvmType Underlying Java type to represent the elements. - */ -private[sql] sealed abstract class ColumnType[JvmType] { - - // The catalyst data type of this column. - def dataType: DataType - - // Default size in bytes for one element of type T (e.g. 4 for `Int`). - def defaultSize: Int - - /** - * Extracts a value out of the buffer at the buffer's current position. - */ - def extract(buffer: ByteBuffer): JvmType - - /** - * Extracts a value out of the buffer at the buffer's current position and stores in - * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever - * possible. - */ - def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - setField(row, ordinal, extract(buffer)) - } - - /** - * Appends the given value v of type T into the given ByteBuffer. - */ - def append(v: JvmType, buffer: ByteBuffer): Unit - - /** - * Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this - * method to avoid boxing/unboxing costs whenever possible. - */ - def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - append(getField(row, ordinal), buffer) - } - - /** - * Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable - * length types such as byte arrays and strings. - */ - def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize - - /** - * Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs - * whenever possible. - */ - def getField(row: InternalRow, ordinal: Int): JvmType - - /** - * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing - * costs whenever possible. - */ - def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit - - /** - * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid - * boxing/unboxing costs whenever possible. - */ - def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } - - /** - * Creates a duplicated copy of the value. - */ - def clone(v: JvmType): JvmType = v - - override def toString: String = getClass.getSimpleName.stripSuffix("$") -} - -private[sql] object NULL extends ColumnType[Any] { - - override def dataType: DataType = NullType - override def defaultSize: Int = 0 - override def append(v: Any, buffer: ByteBuffer): Unit = {} - override def extract(buffer: ByteBuffer): Any = null - override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) - override def getField(row: InternalRow, ordinal: Int): Any = null -} - -private[sql] abstract class NativeColumnType[T <: AtomicType]( - val dataType: T, - val defaultSize: Int) - extends ColumnType[T#InternalType] { - - /** - * Scala TypeTag. Can be used to create primitive arrays and hash tables. - */ - def scalaTag: TypeTag[dataType.InternalType] = dataType.tag -} - -private[sql] object INT extends NativeColumnType(IntegerType, 4) { - override def append(v: Int, buffer: ByteBuffer): Unit = { - buffer.putInt(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putInt(row.getInt(ordinal)) - } - - override def extract(buffer: ByteBuffer): Int = { - ByteBufferHelper.getInt(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setInt(ordinal, ByteBufferHelper.getInt(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = { - row.setInt(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal) - - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setInt(toOrdinal, from.getInt(fromOrdinal)) - } -} - -private[sql] object LONG extends NativeColumnType(LongType, 8) { - override def append(v: Long, buffer: ByteBuffer): Unit = { - buffer.putLong(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putLong(row.getLong(ordinal)) - } - - override def extract(buffer: ByteBuffer): Long = { - ByteBufferHelper.getLong(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { - row.setLong(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setLong(toOrdinal, from.getLong(fromOrdinal)) - } -} - -private[sql] object FLOAT extends NativeColumnType(FloatType, 4) { - override def append(v: Float, buffer: ByteBuffer): Unit = { - buffer.putFloat(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putFloat(row.getFloat(ordinal)) - } - - override def extract(buffer: ByteBuffer): Float = { - ByteBufferHelper.getFloat(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = { - row.setFloat(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) - } -} - -private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) { - override def append(v: Double, buffer: ByteBuffer): Unit = { - buffer.putDouble(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putDouble(row.getDouble(ordinal)) - } - - override def extract(buffer: ByteBuffer): Double = { - ByteBufferHelper.getDouble(buffer) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer)) - } - - override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = { - row.setDouble(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) - } -} - -private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) { - override def append(v: Boolean, buffer: ByteBuffer): Unit = { - buffer.put(if (v) 1: Byte else 0: Byte) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte) - } - - override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1 - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setBoolean(ordinal, buffer.get() == 1) - } - - override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = { - row.setBoolean(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) - } -} - -private[sql] object BYTE extends NativeColumnType(ByteType, 1) { - override def append(v: Byte, buffer: ByteBuffer): Unit = { - buffer.put(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.put(row.getByte(ordinal)) - } - - override def extract(buffer: ByteBuffer): Byte = { - buffer.get() - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setByte(ordinal, buffer.get()) - } - - override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = { - row.setByte(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setByte(toOrdinal, from.getByte(fromOrdinal)) - } -} - -private[sql] object SHORT extends NativeColumnType(ShortType, 2) { - override def append(v: Short, buffer: ByteBuffer): Unit = { - buffer.putShort(v) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - buffer.putShort(row.getShort(ordinal)) - } - - override def extract(buffer: ByteBuffer): Short = { - buffer.getShort() - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - row.setShort(ordinal, buffer.getShort()) - } - - override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = { - row.setShort(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal) - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - to.setShort(toOrdinal, from.getShort(fromOrdinal)) - } -} - -/** - * A fast path to copy var-length bytes between ByteBuffer and UnsafeRow without creating wrapper - * objects. - */ -private[sql] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] { - - // copy the bytes from ByteBuffer to UnsafeRow - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - val numBytes = buffer.getInt - val cursor = buffer.position() - buffer.position(cursor + numBytes) - row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, buffer.array(), - buffer.arrayOffset() + cursor, numBytes) - } else { - setField(row, ordinal, extract(buffer)) - } - } - - // copy the bytes from UnsafeRow to ByteBuffer - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - if (row.isInstanceOf[UnsafeRow]) { - row.asInstanceOf[UnsafeRow].writeFieldTo(ordinal, buffer) - } else { - super.append(row, ordinal, buffer) - } - } -} - -private[sql] object STRING - extends NativeColumnType(StringType, 8) with DirectCopyColumnType[UTF8String] { - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - row.getUTF8String(ordinal).numBytes() + 4 - } - - override def append(v: UTF8String, buffer: ByteBuffer): Unit = { - buffer.putInt(v.numBytes()) - v.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UTF8String = { - val length = buffer.getInt() - val cursor = buffer.position() - buffer.position(cursor + length) - UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length) - } - - override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value) - } else { - row.update(ordinal, value.clone()) - } - } - - override def getField(row: InternalRow, ordinal: Int): UTF8String = { - row.getUTF8String(ordinal) - } - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } - - override def clone(v: UTF8String): UTF8String = v.clone() -} - -private[sql] case class COMPACT_DECIMAL(precision: Int, scale: Int) - extends NativeColumnType(DecimalType(precision, scale), 8) { - - override def extract(buffer: ByteBuffer): Decimal = { - Decimal(ByteBufferHelper.getLong(buffer), precision, scale) - } - - override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { - if (row.isInstanceOf[MutableUnsafeRow]) { - // copy it as Long - row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) - } else { - setField(row, ordinal, extract(buffer)) - } - } - - override def append(v: Decimal, buffer: ByteBuffer): Unit = { - buffer.putLong(v.toUnscaledLong) - } - - override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { - if (row.isInstanceOf[UnsafeRow]) { - // copy it as Long - buffer.putLong(row.getLong(ordinal)) - } else { - append(getField(row, ordinal), buffer) - } - } - - override def getField(row: InternalRow, ordinal: Int): Decimal = { - row.getDecimal(ordinal, precision, scale) - } - - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { - row.setDecimal(ordinal, value, precision) - } - - override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { - setField(to, toOrdinal, getField(from, fromOrdinal)) - } -} - -private[sql] object COMPACT_DECIMAL { - def apply(dt: DecimalType): COMPACT_DECIMAL = { - COMPACT_DECIMAL(dt.precision, dt.scale) - } -} - -private[sql] sealed abstract class ByteArrayColumnType[JvmType](val defaultSize: Int) - extends ColumnType[JvmType] with DirectCopyColumnType[JvmType] { - - def serialize(value: JvmType): Array[Byte] - def deserialize(bytes: Array[Byte]): JvmType - - override def append(v: JvmType, buffer: ByteBuffer): Unit = { - val bytes = serialize(v) - buffer.putInt(bytes.length).put(bytes, 0, bytes.length) - } - - override def extract(buffer: ByteBuffer): JvmType = { - val length = buffer.getInt() - val bytes = new Array[Byte](length) - buffer.get(bytes, 0, length) - deserialize(bytes) - } -} - -private[sql] object BINARY extends ByteArrayColumnType[Array[Byte]](16) { - - def dataType: DataType = BinaryType - - override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): Array[Byte] = { - row.getBinary(ordinal) - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - row.getBinary(ordinal).length + 4 - } - - def serialize(value: Array[Byte]): Array[Byte] = value - def deserialize(bytes: Array[Byte]): Array[Byte] = bytes -} - -private[sql] case class LARGE_DECIMAL(precision: Int, scale: Int) - extends ByteArrayColumnType[Decimal](12) { - - override val dataType: DataType = DecimalType(precision, scale) - - override def getField(row: InternalRow, ordinal: Int): Decimal = { - row.getDecimal(ordinal, precision, scale) - } - - override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { - row.setDecimal(ordinal, value, precision) - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - 4 + getField(row, ordinal).toJavaBigDecimal.unscaledValue().bitLength() / 8 + 1 - } - - override def serialize(value: Decimal): Array[Byte] = { - value.toJavaBigDecimal.unscaledValue().toByteArray - } - - override def deserialize(bytes: Array[Byte]): Decimal = { - val javaDecimal = new BigDecimal(new BigInteger(bytes), scale) - Decimal.apply(javaDecimal, precision, scale) - } -} - -private[sql] object LARGE_DECIMAL { - def apply(dt: DecimalType): LARGE_DECIMAL = { - LARGE_DECIMAL(dt.precision, dt.scale) - } -} - -private[sql] case class STRUCT(dataType: StructType) - extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] { - - private val numOfFields: Int = dataType.fields.size - - override def defaultSize: Int = 20 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeRow = { - row.getStruct(ordinal, numOfFields).asInstanceOf[UnsafeRow] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - 4 + getField(row, ordinal).getSizeInBytes - } - - override def append(value: UnsafeRow, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeRow = { - val sizeInBytes = ByteBufferHelper.getInt(buffer) - assert(buffer.hasArray) - val cursor = buffer.position() - buffer.position(cursor + sizeInBytes) - val unsafeRow = new UnsafeRow - unsafeRow.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numOfFields, - sizeInBytes) - unsafeRow - } - - override def clone(v: UnsafeRow): UnsafeRow = v.copy() -} - -private[sql] case class ARRAY(dataType: ArrayType) - extends ColumnType[UnsafeArrayData] with DirectCopyColumnType[UnsafeArrayData] { - - override def defaultSize: Int = 16 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeArrayData = { - row.getArray(ordinal).asInstanceOf[UnsafeArrayData] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - val unsafeArray = getField(row, ordinal) - 4 + unsafeArray.getSizeInBytes - } - - override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeArrayData = { - val numBytes = buffer.getInt - assert(buffer.hasArray) - val cursor = buffer.position() - buffer.position(cursor + numBytes) - val array = new UnsafeArrayData - array.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numBytes) - array - } - - override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy() -} - -private[sql] case class MAP(dataType: MapType) - extends ColumnType[UnsafeMapData] with DirectCopyColumnType[UnsafeMapData] { - - override def defaultSize: Int = 32 - - override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = { - row.update(ordinal, value) - } - - override def getField(row: InternalRow, ordinal: Int): UnsafeMapData = { - row.getMap(ordinal).asInstanceOf[UnsafeMapData] - } - - override def actualSize(row: InternalRow, ordinal: Int): Int = { - val unsafeMap = getField(row, ordinal) - 4 + unsafeMap.getSizeInBytes - } - - override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = { - buffer.putInt(value.getSizeInBytes) - value.writeTo(buffer) - } - - override def extract(buffer: ByteBuffer): UnsafeMapData = { - val numBytes = buffer.getInt - val cursor = buffer.position() - buffer.position(cursor + numBytes) - val map = new UnsafeMapData - map.pointTo( - buffer.array(), - Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, - numBytes) - map - } - - override def clone(v: UnsafeMapData): UnsafeMapData = v.copy() -} - -private[sql] object ColumnType { - def apply(dataType: DataType): ColumnType[_] = { - dataType match { - case NullType => NULL - case BooleanType => BOOLEAN - case ByteType => BYTE - case ShortType => SHORT - case IntegerType | DateType => INT - case LongType | TimestampType => LONG - case FloatType => FLOAT - case DoubleType => DOUBLE - case StringType => STRING - case BinaryType => BINARY - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => COMPACT_DECIMAL(dt) - case dt: DecimalType => LARGE_DECIMAL(dt) - case arr: ArrayType => ARRAY(arr) - case map: MapType => MAP(map) - case struct: StructType => STRUCT(struct) - case udt: UserDefinedType[_] => apply(udt.sqlType) - case other => - throw new Exception(s"Unsupported type: $other") - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala deleted file mode 100644 index ff9393b465..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/GenerateColumnAccessor.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, CodeFormatter, CodeGenerator} -import org.apache.spark.sql.types._ - -/** - * An Iterator to walk through the InternalRows from a CachedBatch - */ -abstract class ColumnarIterator extends Iterator[InternalRow] { - def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType], - columnIndexes: Array[Int]): Unit -} - -/** - * An helper class to update the fields of UnsafeRow, used by ColumnAccessor - * - * WARNING: These setter MUST be called in increasing order of ordinals. - */ -class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) { - - override def isNullAt(i: Int): Boolean = writer.isNullAt(i) - override def setNullAt(i: Int): Unit = writer.setNullAt(i) - - override def setBoolean(i: Int, v: Boolean): Unit = writer.write(i, v) - override def setByte(i: Int, v: Byte): Unit = writer.write(i, v) - override def setShort(i: Int, v: Short): Unit = writer.write(i, v) - override def setInt(i: Int, v: Int): Unit = writer.write(i, v) - override def setLong(i: Int, v: Long): Unit = writer.write(i, v) - override def setFloat(i: Int, v: Float): Unit = writer.write(i, v) - override def setDouble(i: Int, v: Double): Unit = writer.write(i, v) - - // the writer will be used directly to avoid creating wrapper objects - override def setDecimal(i: Int, v: Decimal, precision: Int): Unit = - throw new UnsupportedOperationException - override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException - - // all other methods inherited from GenericMutableRow are not need -} - -/** - * Generates bytecode for an [[ColumnarIterator]] for columnar cache. - */ -object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging { - - protected def canonicalize(in: Seq[DataType]): Seq[DataType] = in - protected def bind(in: Seq[DataType], inputSchema: Seq[Attribute]): Seq[DataType] = in - - protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { - val ctx = newCodeGenContext() - val numFields = columnTypes.size - val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => - val accessorName = ctx.freshName("accessor") - val accessorCls = dt match { - case NullType => classOf[NullColumnAccessor].getName - case BooleanType => classOf[BooleanColumnAccessor].getName - case ByteType => classOf[ByteColumnAccessor].getName - case ShortType => classOf[ShortColumnAccessor].getName - case IntegerType | DateType => classOf[IntColumnAccessor].getName - case LongType | TimestampType => classOf[LongColumnAccessor].getName - case FloatType => classOf[FloatColumnAccessor].getName - case DoubleType => classOf[DoubleColumnAccessor].getName - case StringType => classOf[StringColumnAccessor].getName - case BinaryType => classOf[BinaryColumnAccessor].getName - case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => - classOf[CompactDecimalColumnAccessor].getName - case dt: DecimalType => classOf[DecimalColumnAccessor].getName - case struct: StructType => classOf[StructColumnAccessor].getName - case array: ArrayType => classOf[ArrayColumnAccessor].getName - case t: MapType => classOf[MapColumnAccessor].getName - } - ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") - - val createCode = dt match { - case t if ctx.isPrimitiveType(dt) => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case NullType | StringType | BinaryType => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case other => - s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), - (${dt.getClass.getName}) columnTypes[$index]);""" - } - - val extract = s"$accessorName.extractTo(mutableRow, $index);" - val patch = dt match { - case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS => - // For large Decimal, it should have 16 bytes for future update even it's null now. - s""" - if (mutableRow.isNullAt($index)) { - rowWriter.write($index, (Decimal) null, $p, $s); - } - """ - case other => "" - } - (createCode, extract + patch) - }.unzip - - val code = s""" - import java.nio.ByteBuffer; - import java.nio.ByteOrder; - import scala.collection.Iterator; - import org.apache.spark.sql.types.DataType; - import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; - import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; - import org.apache.spark.sql.columnar.MutableUnsafeRow; - - public SpecificColumnarIterator generate($exprType[] expr) { - return new SpecificColumnarIterator(); - } - - class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} { - - private ByteOrder nativeOrder = null; - private byte[][] buffers = null; - private UnsafeRow unsafeRow = new UnsafeRow(); - private BufferHolder bufferHolder = new BufferHolder(); - private UnsafeRowWriter rowWriter = new UnsafeRowWriter(); - private MutableUnsafeRow mutableRow = null; - - private int currentRow = 0; - private int numRowsInBatch = 0; - - private scala.collection.Iterator input = null; - private DataType[] columnTypes = null; - private int[] columnIndexes = null; - - ${declareMutableStates(ctx)} - - public SpecificColumnarIterator() { - this.nativeOrder = ByteOrder.nativeOrder(); - this.buffers = new byte[${columnTypes.length}][]; - this.mutableRow = new MutableUnsafeRow(rowWriter); - - ${initMutableStates(ctx)} - } - - public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { - this.input = input; - this.columnTypes = columnTypes; - this.columnIndexes = columnIndexes; - } - - public boolean hasNext() { - if (currentRow < numRowsInBatch) { - return true; - } - if (!input.hasNext()) { - return false; - } - - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); - currentRow = 0; - numRowsInBatch = batch.numRows(); - for (int i = 0; i < columnIndexes.length; i ++) { - buffers[i] = batch.buffers()[columnIndexes[i]]; - } - ${initializeAccessors.mkString("\n")} - - return hasNext(); - } - - public InternalRow next() { - currentRow += 1; - bufferHolder.reset(); - rowWriter.initialize(bufferHolder, $numFields); - ${extractors.mkString("\n")} - unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize()); - return unsafeRow; - } - }""" - - logDebug(s"Generated ColumnarIterator: ${CodeFormatter.format(code)}") - - compile(code).generate(ctx.references.toArray).asInstanceOf[ColumnarIterator] - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala deleted file mode 100644 index ae77298e6d..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan} -import org.apache.spark.sql.types.UserDefinedType -import org.apache.spark.storage.StorageLevel -import org.apache.spark.{Accumulable, Accumulator, Accumulators} - -private[sql] object InMemoryRelation { - def apply( - useCompression: Boolean, - batchSize: Int, - storageLevel: StorageLevel, - child: SparkPlan, - tableName: Option[String]): InMemoryRelation = - new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, - if (child.outputsUnsafeRows) child else ConvertToUnsafe(child), - tableName)() -} - -/** - * CachedBatch is a cached batch of rows. - * - * @param numRows The total number of rows in this batch - * @param buffers The buffers for serialized columns - * @param stats The stat of columns - */ -private[sql] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) - -private[sql] case class InMemoryRelation( - output: Seq[Attribute], - useCompression: Boolean, - batchSize: Int, - storageLevel: StorageLevel, - @transient child: SparkPlan, - tableName: Option[String])( - @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, - @transient private var _statistics: Statistics = null, - private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) - extends LogicalPlan with MultiInstanceRelation { - - private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = - if (_batchStats == null) { - child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) - } else { - _batchStats - } - - @transient val partitionStatistics = new PartitionStatistics(output) - - private def computeSizeInBytes = { - val sizeOfRow: Expression = - BindReferences.bindReference( - output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), - partitionStatistics.schema) - - batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum - } - - // Statistics propagation contracts: - // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data - // 2. Only propagate statistics when `_statistics` is non-null - private def statisticsToBePropagated = if (_statistics == null) { - val updatedStats = statistics - if (_statistics == null) null else updatedStats - } else { - _statistics - } - - override def statistics: Statistics = { - if (_statistics == null) { - if (batchStats.value.isEmpty) { - // Underlying columnar RDD hasn't been materialized, no useful statistics information - // available, return the default statistics. - Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) - } else { - // Underlying columnar RDD has been materialized, required information has also been - // collected via the `batchStats` accumulator, compute the final statistics, - // and update `_statistics`. - _statistics = Statistics(sizeInBytes = computeSizeInBytes) - _statistics - } - } else { - // Pre-computed statistics - _statistics - } - } - - // If the cached column buffers were not passed in, we calculate them in the constructor. - // As in Spark, the actual work of caching is lazy. - if (_cachedColumnBuffers == null) { - buildBuffers() - } - - def recache(): Unit = { - _cachedColumnBuffers.unpersist() - _cachedColumnBuffers = null - buildBuffers() - } - - private def buildBuffers(): Unit = { - val output = child.output - val cached = child.execute().mapPartitionsInternal { rowIterator => - new Iterator[CachedBatch] { - def next(): CachedBatch = { - val columnBuilders = output.map { attribute => - ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) - }.toArray - - var rowCount = 0 - var totalSize = 0L - while (rowIterator.hasNext && rowCount < batchSize - && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { - val row = rowIterator.next() - - // Added for SPARK-6082. This assertion can be useful for scenarios when something - // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM - // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat - // hard to decipher. - assert( - row.numFields == columnBuilders.size, - s"Row column number mismatch, expected ${output.size} columns, " + - s"but got ${row.numFields}." + - s"\nRow content: $row") - - var i = 0 - totalSize = 0 - while (i < row.numFields) { - columnBuilders(i).appendFrom(row, i) - totalSize += columnBuilders(i).columnStats.sizeInBytes - i += 1 - } - rowCount += 1 - } - - val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) - .flatMap(_.values)) - - batchStats += stats - CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats) - } - - def hasNext: Boolean = rowIterator.hasNext - } - }.persist(storageLevel) - - cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) - _cachedColumnBuffers = cached - } - - def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { - InMemoryRelation( - newOutput, useCompression, batchSize, storageLevel, child, tableName)( - _cachedColumnBuffers, statisticsToBePropagated, batchStats) - } - - override def children: Seq[LogicalPlan] = Seq.empty - - override def newInstance(): this.type = { - new InMemoryRelation( - output.map(_.newInstance()), - useCompression, - batchSize, - storageLevel, - child, - tableName)( - _cachedColumnBuffers, - statisticsToBePropagated, - batchStats).asInstanceOf[this.type] - } - - def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers - - override protected def otherCopyArgs: Seq[AnyRef] = - Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) - - private[sql] def uncache(blocking: Boolean): Unit = { - Accumulators.remove(batchStats.id) - cachedColumnBuffers.unpersist(blocking) - _cachedColumnBuffers = null - } -} - -private[sql] case class InMemoryColumnarTableScan( - attributes: Seq[Attribute], - predicates: Seq[Expression], - @transient relation: InMemoryRelation) - extends LeafNode { - - override def output: Seq[Attribute] = attributes - - // The cached version does not change the outputPartitioning of the original SparkPlan. - override def outputPartitioning: Partitioning = relation.child.outputPartitioning - - // The cached version does not change the outputOrdering of the original SparkPlan. - override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering - - override def outputsUnsafeRows: Boolean = true - - private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) - - // Returned filter predicate should return false iff it is impossible for the input expression - // to evaluate to `true' based on statistics collected about this partition batch. - @transient val buildFilter: PartialFunction[Expression, Expression] = { - case And(lhs: Expression, rhs: Expression) - if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => - (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) - - case Or(lhs: Expression, rhs: Expression) - if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) => - buildFilter(lhs) || buildFilter(rhs) - - case EqualTo(a: AttributeReference, l: Literal) => - statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound - case EqualTo(l: Literal, a: AttributeReference) => - statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound - - case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l - case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound - - case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l - case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound - - case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound - case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l - - case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound - case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l - - case IsNull(a: Attribute) => statsFor(a).nullCount > 0 - case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 - } - - val partitionFilters: Seq[Expression] = { - predicates.flatMap { p => - val filter = buildFilter.lift(p) - val boundFilter = - filter.map( - BindReferences.bindReference( - _, - relation.partitionStatistics.schema, - allowFailures = true)) - - boundFilter.foreach(_ => - filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f"))) - - // If the filter can't be resolved then we are missing required statistics. - boundFilter.filter(_.resolved) - } - } - - lazy val enableAccumulators: Boolean = - sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean - - // Accumulators used for testing purposes - lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) - lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0) - - private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning - - protected override def doExecute(): RDD[InternalRow] = { - if (enableAccumulators) { - readPartitions.setValue(0) - readBatches.setValue(0) - } - - // Using these variables here to avoid serialization of entire objects (if referenced directly) - // within the map Partitions closure. - val schema = relation.partitionStatistics.schema - val schemaIndex = schema.zipWithIndex - val relOutput = relation.output - val buffers = relation.cachedColumnBuffers - - buffers.mapPartitionsInternal { cachedBatchIterator => - val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), - schema) - - // Find the ordinals and data types of the requested columns. - val (requestedColumnIndices, requestedColumnDataTypes) = - attributes.map { a => - relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType - }.unzip - - // Do partition batch pruning if enabled - val cachedBatchesToScan = - if (inMemoryPartitionPruningEnabled) { - cachedBatchIterator.filter { cachedBatch => - if (!partitionFilter(cachedBatch.stats)) { - def statsString: String = schemaIndex.map { - case (a, i) => - val value = cachedBatch.stats.get(i, a.dataType) - s"${a.name}: $value" - }.mkString(", ") - logInfo(s"Skipping partition based on stats $statsString") - false - } else { - if (enableAccumulators) { - readBatches += 1 - } - true - } - } - } else { - cachedBatchIterator - } - - val columnTypes = requestedColumnDataTypes.map { - case udt: UserDefinedType[_] => udt.sqlType - case other => other - }.toArray - val columnarIterator = GenerateColumnAccessor.generate(columnTypes) - columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray) - if (enableAccumulators && columnarIterator.hasNext) { - readPartitions += 1 - } - columnarIterator - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala deleted file mode 100644 index 7eaecfe047..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteOrder, ByteBuffer} - -import org.apache.spark.sql.catalyst.expressions.MutableRow - -private[sql] trait NullableColumnAccessor extends ColumnAccessor { - private var nullsBuffer: ByteBuffer = _ - private var nullCount: Int = _ - private var seenNulls: Int = 0 - - private var nextNullIndex: Int = _ - private var pos: Int = 0 - - abstract override protected def initialize(): Unit = { - nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) - nullCount = ByteBufferHelper.getInt(nullsBuffer) - nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 - pos = 0 - - underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) - super.initialize() - } - - abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = { - if (pos == nextNullIndex) { - seenNulls += 1 - - if (seenNulls < nullCount) { - nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) - } - - row.setNullAt(ordinal) - } else { - super.extractTo(row, ordinal) - } - - pos += 1 - } - - abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala deleted file mode 100644 index 76cfddf1cd..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.sql.catalyst.InternalRow - -/** - * A stackable trait used for building byte buffer for a column containing null values. Memory - * layout of the final byte buffer is: - * {{{ - * .------------------- Null count N (4 bytes) - * | .--------------- Null positions (4 x N bytes, empty if null count is zero) - * | | .--------- Non-null elements - * V V V - * +---+-----+---------+ - * | | ... | ... ... | - * +---+-----+---------+ - * }}} - */ -private[sql] trait NullableColumnBuilder extends ColumnBuilder { - protected var nulls: ByteBuffer = _ - protected var nullCount: Int = _ - private var pos: Int = _ - - abstract override def initialize( - initialSize: Int, - columnName: String, - useCompression: Boolean): Unit = { - - nulls = ByteBuffer.allocate(1024) - nulls.order(ByteOrder.nativeOrder()) - pos = 0 - nullCount = 0 - super.initialize(initialSize, columnName, useCompression) - } - - abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - columnStats.gatherStats(row, ordinal) - if (row.isNullAt(ordinal)) { - nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) - nulls.putInt(pos) - nullCount += 1 - } else { - super.appendFrom(row, ordinal) - } - pos += 1 - } - - abstract override def build(): ByteBuffer = { - val nonNulls = super.build() - val nullDataLen = nulls.position() - - nulls.limit(nullDataLen) - nulls.rewind() - - val buffer = ByteBuffer - .allocate(4 + nullDataLen + nonNulls.remaining()) - .order(ByteOrder.nativeOrder()) - .putInt(nullCount) - .put(nulls) - .put(nonNulls) - - buffer.rewind() - buffer - } - - protected def buildNonNulls(): ByteBuffer = { - nulls.limit(nulls.position()).rewind() - super.build() - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala deleted file mode 100644 index cb205defbb..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.sql.catalyst.expressions.MutableRow -import org.apache.spark.sql.columnar.{ColumnAccessor, NativeColumnAccessor} -import org.apache.spark.sql.types.AtomicType - -private[sql] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor { - this: NativeColumnAccessor[T] => - - private var decoder: Decoder[T] = _ - - abstract override protected def initialize(): Unit = { - super.initialize() - decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType) - } - - abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext - - override def extractSingle(row: MutableRow, ordinal: Int): Unit = { - decoder.next(row, ordinal) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala deleted file mode 100644 index 161021ff96..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.columnar.{ColumnBuilder, NativeColumnBuilder} -import org.apache.spark.sql.types.AtomicType - -/** - * A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of - * the final byte buffer is: - * {{{ - * .----------------------- Null count N (4 bytes) - * | .------------------- Null positions (4 x N bytes, empty if null count is zero) - * | | .------------- Compression scheme ID (4 bytes) - * | | | .--------- Compressed non-null elements - * V V V V - * +---+-----+---+---------+ - * | | ... | | ... ... | - * +---+-----+---+---------+ - * \-------/ \-----------/ - * header body - * }}} - */ -private[sql] trait CompressibleColumnBuilder[T <: AtomicType] - extends ColumnBuilder with Logging { - - this: NativeColumnBuilder[T] with WithCompressionSchemes => - - var compressionEncoders: Seq[Encoder[T]] = _ - - abstract override def initialize( - initialSize: Int, - columnName: String, - useCompression: Boolean): Unit = { - - compressionEncoders = - if (useCompression) { - schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType)) - } else { - Seq(PassThrough.encoder(columnType)) - } - super.initialize(initialSize, columnName, useCompression) - } - - protected def isWorthCompressing(encoder: Encoder[T]) = { - encoder.compressionRatio < 0.8 - } - - private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - var i = 0 - while (i < compressionEncoders.length) { - compressionEncoders(i).gatherCompressibilityStats(row, ordinal) - i += 1 - } - } - - abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { - super.appendFrom(row, ordinal) - if (!row.isNullAt(ordinal)) { - gatherCompressibilityStats(row, ordinal) - } - } - - override def build(): ByteBuffer = { - val nonNullBuffer = buildNonNulls() - val encoder: Encoder[T] = { - val candidate = compressionEncoders.minBy(_.compressionRatio) - if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType) - } - - // Header = null count + null positions - val headerSize = 4 + nulls.limit() - val compressedSize = if (encoder.compressedSize == 0) { - nonNullBuffer.remaining() - } else { - encoder.compressedSize - } - - val compressedBuffer = ByteBuffer - // Reserves 4 bytes for compression scheme ID - .allocate(headerSize + 4 + compressedSize) - .order(ByteOrder.nativeOrder) - // Write the header - .putInt(nullCount) - .put(nulls) - - logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}") - encoder.compress(nonNullBuffer, compressedBuffer) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala deleted file mode 100644 index 9322b772fd..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.{ByteBuffer, ByteOrder} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.MutableRow -import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType} -import org.apache.spark.sql.types.AtomicType - -private[sql] trait Encoder[T <: AtomicType] { - def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {} - - def compressedSize: Int - - def uncompressedSize: Int - - def compressionRatio: Double = { - if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0 - } - - def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer -} - -private[sql] trait Decoder[T <: AtomicType] { - def next(row: MutableRow, ordinal: Int): Unit - - def hasNext: Boolean -} - -private[sql] trait CompressionScheme { - def typeId: Int - - def supports(columnType: ColumnType[_]): Boolean - - def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] - - def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] -} - -private[sql] trait WithCompressionSchemes { - def schemes: Seq[CompressionScheme] -} - -private[sql] trait AllCompressionSchemes extends WithCompressionSchemes { - override val schemes: Seq[CompressionScheme] = CompressionScheme.all -} - -private[sql] object CompressionScheme { - val all: Seq[CompressionScheme] = - Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) - - private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap - - def apply(typeId: Int): CompressionScheme = { - typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException( - s"Unrecognized compression scheme type ID: $typeId")) - } - - def columnHeaderSize(columnBuffer: ByteBuffer): Int = { - val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) - val nullCount = header.getInt() - // null count + null positions - 4 + 4 * nullCount - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala deleted file mode 100644 index 41c9a284e3..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.ByteBuffer - -import scala.collection.mutable - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.types._ - - -private[sql] case object PassThrough extends CompressionScheme { - override val typeId = 0 - - override def supports(columnType: ColumnType[_]): Boolean = true - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { - new this.Encoder[T](columnType) - } - - override def decoder[T <: AtomicType]( - buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { - new this.Decoder(buffer, columnType) - } - - class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { - override def uncompressedSize: Int = 0 - - override def compressedSize: Int = 0 - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - // Writes compression type ID and copies raw contents - to.putInt(PassThrough.typeId).put(from).rewind() - to - } - } - - class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - extends compression.Decoder[T] { - - override def next(row: MutableRow, ordinal: Int): Unit = { - columnType.extract(buffer, row, ordinal) - } - - override def hasNext: Boolean = buffer.hasRemaining - } -} - -private[sql] case object RunLengthEncoding extends CompressionScheme { - override val typeId = 1 - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { - new this.Encoder[T](columnType) - } - - override def decoder[T <: AtomicType]( - buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { - new this.Decoder(buffer, columnType) - } - - override def supports(columnType: ColumnType[_]): Boolean = columnType match { - case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true - case _ => false - } - - class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { - private var _uncompressedSize = 0 - private var _compressedSize = 0 - - // Using `MutableRow` to store the last value to avoid boxing/unboxing cost. - private val lastValue = new SpecificMutableRow(Seq(columnType.dataType)) - private var lastRun = 0 - - override def uncompressedSize: Int = _uncompressedSize - - override def compressedSize: Int = _compressedSize - - override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - val value = columnType.getField(row, ordinal) - val actualSize = columnType.actualSize(row, ordinal) - _uncompressedSize += actualSize - - if (lastValue.isNullAt(0)) { - columnType.copyField(row, ordinal, lastValue, 0) - lastRun = 1 - _compressedSize += actualSize + 4 - } else { - if (columnType.getField(lastValue, 0) == value) { - lastRun += 1 - } else { - _compressedSize += actualSize + 4 - columnType.copyField(row, ordinal, lastValue, 0) - lastRun = 1 - } - } - } - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - to.putInt(RunLengthEncoding.typeId) - - if (from.hasRemaining) { - val currentValue = new SpecificMutableRow(Seq(columnType.dataType)) - var currentRun = 1 - val value = new SpecificMutableRow(Seq(columnType.dataType)) - - columnType.extract(from, currentValue, 0) - - while (from.hasRemaining) { - columnType.extract(from, value, 0) - - if (value.get(0, columnType.dataType) == currentValue.get(0, columnType.dataType)) { - currentRun += 1 - } else { - // Writes current run - columnType.append(currentValue, 0, to) - to.putInt(currentRun) - - // Resets current run - columnType.copyField(value, 0, currentValue, 0) - currentRun = 1 - } - } - - // Writes the last run - columnType.append(currentValue, 0, to) - to.putInt(currentRun) - } - - to.rewind() - to - } - } - - class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - extends compression.Decoder[T] { - - private var run = 0 - private var valueCount = 0 - private var currentValue: T#InternalType = _ - - override def next(row: MutableRow, ordinal: Int): Unit = { - if (valueCount == run) { - currentValue = columnType.extract(buffer) - run = ByteBufferHelper.getInt(buffer) - valueCount = 1 - } else { - valueCount += 1 - } - - columnType.setField(row, ordinal, currentValue) - } - - override def hasNext: Boolean = valueCount < run || buffer.hasRemaining - } -} - -private[sql] case object DictionaryEncoding extends CompressionScheme { - override val typeId = 2 - - // 32K unique values allowed - val MAX_DICT_SIZE = Short.MaxValue - - override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - : Decoder[T] = { - new this.Decoder(buffer, columnType) - } - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { - new this.Encoder[T](columnType) - } - - override def supports(columnType: ColumnType[_]): Boolean = columnType match { - case INT | LONG | STRING => true - case _ => false - } - - class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { - // Size of the input, uncompressed, in bytes. Note that we only count until the dictionary - // overflows. - private var _uncompressedSize = 0 - - // If the number of distinct elements is too large, we discard the use of dictionary encoding - // and set the overflow flag to true. - private var overflow = false - - // Total number of elements. - private var count = 0 - - // The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself. - private var values = new mutable.ArrayBuffer[T#InternalType](1024) - - // The dictionary that maps a value to the encoded short integer. - private val dictionary = mutable.HashMap.empty[Any, Short] - - // Size of the serialized dictionary in bytes. Initialized to 4 since we need at least an `Int` - // to store dictionary element count. - private var dictionarySize = 4 - - override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - val value = columnType.getField(row, ordinal) - - if (!overflow) { - val actualSize = columnType.actualSize(row, ordinal) - count += 1 - _uncompressedSize += actualSize - - if (!dictionary.contains(value)) { - if (dictionary.size < MAX_DICT_SIZE) { - val clone = columnType.clone(value) - values += clone - dictionarySize += actualSize - dictionary(clone) = dictionary.size.toShort - } else { - overflow = true - values.clear() - dictionary.clear() - } - } - } - } - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - if (overflow) { - throw new IllegalStateException( - "Dictionary encoding should not be used because of dictionary overflow.") - } - - to.putInt(DictionaryEncoding.typeId) - .putInt(dictionary.size) - - var i = 0 - while (i < values.length) { - columnType.append(values(i), to) - i += 1 - } - - while (from.hasRemaining) { - to.putShort(dictionary(columnType.extract(from))) - } - - to.rewind() - to - } - - override def uncompressedSize: Int = _uncompressedSize - - override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2 - } - - class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - extends compression.Decoder[T] { - - private val dictionary: Array[Any] = { - val elementNum = ByteBufferHelper.getInt(buffer) - Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) - } - - override def next(row: MutableRow, ordinal: Int): Unit = { - columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType]) - } - - override def hasNext: Boolean = buffer.hasRemaining - } -} - -private[sql] case object BooleanBitSet extends CompressionScheme { - override val typeId = 3 - - val BITS_PER_LONG = 64 - - override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - : compression.Decoder[T] = { - new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]] - } - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { - (new this.Encoder).asInstanceOf[compression.Encoder[T]] - } - - override def supports(columnType: ColumnType[_]): Boolean = columnType == BOOLEAN - - class Encoder extends compression.Encoder[BooleanType.type] { - private var _uncompressedSize = 0 - - override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - _uncompressedSize += BOOLEAN.defaultSize - } - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - to.putInt(BooleanBitSet.typeId) - // Total element count (1 byte per Boolean value) - .putInt(from.remaining) - - while (from.remaining >= BITS_PER_LONG) { - var word = 0: Long - var i = 0 - - while (i < BITS_PER_LONG) { - if (BOOLEAN.extract(from)) { - word |= (1: Long) << i - } - i += 1 - } - - to.putLong(word) - } - - if (from.hasRemaining) { - var word = 0: Long - var i = 0 - - while (from.hasRemaining) { - if (BOOLEAN.extract(from)) { - word |= (1: Long) << i - } - i += 1 - } - - to.putLong(word) - } - - to.rewind() - to - } - - override def uncompressedSize: Int = _uncompressedSize - - override def compressedSize: Int = { - val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1 - (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4 - } - } - - class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] { - private val count = ByteBufferHelper.getInt(buffer) - - private var currentWord = 0: Long - - private var visited: Int = 0 - - override def next(row: MutableRow, ordinal: Int): Unit = { - val bit = visited % BITS_PER_LONG - - visited += 1 - if (bit == 0) { - currentWord = ByteBufferHelper.getLong(buffer) - } - - row.setBoolean(ordinal, ((currentWord >> bit) & 1) != 0) - } - - override def hasNext: Boolean = visited < count - } -} - -private[sql] case object IntDelta extends CompressionScheme { - override def typeId: Int = 4 - - override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - : compression.Decoder[T] = { - new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]] - } - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { - (new Encoder).asInstanceOf[compression.Encoder[T]] - } - - override def supports(columnType: ColumnType[_]): Boolean = columnType == INT - - class Encoder extends compression.Encoder[IntegerType.type] { - protected var _compressedSize: Int = 0 - protected var _uncompressedSize: Int = 0 - - override def compressedSize: Int = _compressedSize - override def uncompressedSize: Int = _uncompressedSize - - private var prevValue: Int = _ - - override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - val value = row.getInt(ordinal) - val delta = value - prevValue - - _compressedSize += 1 - - // If this is the first integer to be compressed, or the delta is out of byte range, then give - // up compressing this integer. - if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { - _compressedSize += INT.defaultSize - } - - _uncompressedSize += INT.defaultSize - prevValue = value - } - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - to.putInt(typeId) - - if (from.hasRemaining) { - var prev = from.getInt() - to.put(Byte.MinValue) - to.putInt(prev) - - while (from.hasRemaining) { - val current = from.getInt() - val delta = current - prev - prev = current - - if (Byte.MinValue < delta && delta <= Byte.MaxValue) { - to.put(delta.toByte) - } else { - to.put(Byte.MinValue) - to.putInt(current) - } - } - } - - to.rewind().asInstanceOf[ByteBuffer] - } - } - - class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type]) - extends compression.Decoder[IntegerType.type] { - - private var prev: Int = _ - - override def hasNext: Boolean = buffer.hasRemaining - - override def next(row: MutableRow, ordinal: Int): Unit = { - val delta = buffer.get() - prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) - row.setInt(ordinal, prev) - } - } -} - -private[sql] case object LongDelta extends CompressionScheme { - override def typeId: Int = 5 - - override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) - : compression.Decoder[T] = { - new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]] - } - - override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { - (new Encoder).asInstanceOf[compression.Encoder[T]] - } - - override def supports(columnType: ColumnType[_]): Boolean = columnType == LONG - - class Encoder extends compression.Encoder[LongType.type] { - protected var _compressedSize: Int = 0 - protected var _uncompressedSize: Int = 0 - - override def compressedSize: Int = _compressedSize - override def uncompressedSize: Int = _uncompressedSize - - private var prevValue: Long = _ - - override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { - val value = row.getLong(ordinal) - val delta = value - prevValue - - _compressedSize += 1 - - // If this is the first long integer to be compressed, or the delta is out of byte range, then - // give up compressing this long integer. - if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { - _compressedSize += LONG.defaultSize - } - - _uncompressedSize += LONG.defaultSize - prevValue = value - } - - override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { - to.putInt(typeId) - - if (from.hasRemaining) { - var prev = from.getLong() - to.put(Byte.MinValue) - to.putLong(prev) - - while (from.hasRemaining) { - val current = from.getLong() - val delta = current - prev - prev = current - - if (Byte.MinValue < delta && delta <= Byte.MaxValue) { - to.put(delta.toByte) - } else { - to.put(Byte.MinValue) - to.putLong(current) - } - } - } - - to.rewind().asInstanceOf[ByteBuffer] - } - } - - class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type]) - extends compression.Decoder[LongType.type] { - - private var prev: Long = _ - - override def hasNext: Boolean = buffer.hasRemaining - - override def next(row: MutableRow, ordinal: Int): Unit = { - val delta = buffer.get() - prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) - row.setLong(ordinal, prev) - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f85aeb1b02..293fcfe96e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.spark.Logging import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 3d4ce633c0..f67c951bc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} import org.apache.spark.sql.{Strategy, execution} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala new file mode 100644 index 0000000000..fee36f6023 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnAccessor.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.sql.catalyst.expressions.{MutableRow, UnsafeArrayData, UnsafeMapData, UnsafeRow} +import org.apache.spark.sql.execution.columnar.compression.CompressibleColumnAccessor +import org.apache.spark.sql.types._ + +/** + * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is + * extracted from the buffer, instead of directly returning it, the value is set into some field of + * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods + * for primitive values provided by [[MutableRow]]. + */ +private[columnar] trait ColumnAccessor { + initialize() + + protected def initialize() + + def hasNext: Boolean + + def extractTo(row: MutableRow, ordinal: Int) + + protected def underlyingBuffer: ByteBuffer +} + +private[columnar] abstract class BasicColumnAccessor[JvmType]( + protected val buffer: ByteBuffer, + protected val columnType: ColumnType[JvmType]) + extends ColumnAccessor { + + protected def initialize() {} + + override def hasNext: Boolean = buffer.hasRemaining + + override def extractTo(row: MutableRow, ordinal: Int): Unit = { + extractSingle(row, ordinal) + } + + def extractSingle(row: MutableRow, ordinal: Int): Unit = { + columnType.extract(buffer, row, ordinal) + } + + protected def underlyingBuffer = buffer +} + +private[columnar] class NullColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[Any](buffer, NULL) + with NullableColumnAccessor + +private[columnar] abstract class NativeColumnAccessor[T <: AtomicType]( + override protected val buffer: ByteBuffer, + override protected val columnType: NativeColumnType[T]) + extends BasicColumnAccessor(buffer, columnType) + with NullableColumnAccessor + with CompressibleColumnAccessor[T] + +private[columnar] class BooleanColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BOOLEAN) + +private[columnar] class ByteColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, BYTE) + +private[columnar] class ShortColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, SHORT) + +private[columnar] class IntColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, INT) + +private[columnar] class LongColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, LONG) + +private[columnar] class FloatColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, FLOAT) + +private[columnar] class DoubleColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, DOUBLE) + +private[columnar] class StringColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, STRING) + +private[columnar] class BinaryColumnAccessor(buffer: ByteBuffer) + extends BasicColumnAccessor[Array[Byte]](buffer, BINARY) + with NullableColumnAccessor + +private[columnar] class CompactDecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) + extends NativeColumnAccessor(buffer, COMPACT_DECIMAL(dataType)) + +private[columnar] class DecimalColumnAccessor(buffer: ByteBuffer, dataType: DecimalType) + extends BasicColumnAccessor[Decimal](buffer, LARGE_DECIMAL(dataType)) + with NullableColumnAccessor + +private[columnar] class StructColumnAccessor(buffer: ByteBuffer, dataType: StructType) + extends BasicColumnAccessor[UnsafeRow](buffer, STRUCT(dataType)) + with NullableColumnAccessor + +private[columnar] class ArrayColumnAccessor(buffer: ByteBuffer, dataType: ArrayType) + extends BasicColumnAccessor[UnsafeArrayData](buffer, ARRAY(dataType)) + with NullableColumnAccessor + +private[columnar] class MapColumnAccessor(buffer: ByteBuffer, dataType: MapType) + extends BasicColumnAccessor[UnsafeMapData](buffer, MAP(dataType)) + with NullableColumnAccessor + +private[columnar] object ColumnAccessor { + def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = { + val buf = buffer.order(ByteOrder.nativeOrder) + + dataType match { + case NullType => new NullColumnAccessor(buf) + case BooleanType => new BooleanColumnAccessor(buf) + case ByteType => new ByteColumnAccessor(buf) + case ShortType => new ShortColumnAccessor(buf) + case IntegerType | DateType => new IntColumnAccessor(buf) + case LongType | TimestampType => new LongColumnAccessor(buf) + case FloatType => new FloatColumnAccessor(buf) + case DoubleType => new DoubleColumnAccessor(buf) + case StringType => new StringColumnAccessor(buf) + case BinaryType => new BinaryColumnAccessor(buf) + case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => + new CompactDecimalColumnAccessor(buf, dt) + case dt: DecimalType => new DecimalColumnAccessor(buf, dt) + case struct: StructType => new StructColumnAccessor(buf, struct) + case array: ArrayType => new ArrayColumnAccessor(buf, array) + case map: MapType => new MapColumnAccessor(buf, map) + case udt: UserDefinedType[_] => ColumnAccessor(udt.sqlType, buffer) + case other => + throw new Exception(s"not support type: $other") + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala new file mode 100644 index 0000000000..7e26f19bb7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnBuilder.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.columnar.ColumnBuilder._ +import org.apache.spark.sql.execution.columnar.compression.{AllCompressionSchemes, CompressibleColumnBuilder} +import org.apache.spark.sql.types._ + +private[columnar] trait ColumnBuilder { + /** + * Initializes with an approximate lower bound on the expected number of elements in this column. + */ + def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false) + + /** + * Appends `row(ordinal)` to the column builder. + */ + def appendFrom(row: InternalRow, ordinal: Int) + + /** + * Column statistics information + */ + def columnStats: ColumnStats + + /** + * Returns the final columnar byte buffer. + */ + def build(): ByteBuffer +} + +private[columnar] class BasicColumnBuilder[JvmType]( + val columnStats: ColumnStats, + val columnType: ColumnType[JvmType]) + extends ColumnBuilder { + + protected var columnName: String = _ + + protected var buffer: ByteBuffer = _ + + override def initialize( + initialSize: Int, + columnName: String = "", + useCompression: Boolean = false): Unit = { + + val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize + this.columnName = columnName + + buffer = ByteBuffer.allocate(size * columnType.defaultSize) + buffer.order(ByteOrder.nativeOrder()) + } + + override def appendFrom(row: InternalRow, ordinal: Int): Unit = { + buffer = ensureFreeSpace(buffer, columnType.actualSize(row, ordinal)) + columnType.append(row, ordinal, buffer) + } + + override def build(): ByteBuffer = { + if (buffer.capacity() > buffer.position() * 1.1) { + // trim the buffer + buffer = ByteBuffer + .allocate(buffer.position()) + .order(ByteOrder.nativeOrder()) + .put(buffer.array(), 0, buffer.position()) + } + buffer.flip().asInstanceOf[ByteBuffer] + } +} + +private[columnar] class NullColumnBuilder + extends BasicColumnBuilder[Any](new ObjectColumnStats(NullType), NULL) + with NullableColumnBuilder + +private[columnar] abstract class ComplexColumnBuilder[JvmType]( + columnStats: ColumnStats, + columnType: ColumnType[JvmType]) + extends BasicColumnBuilder[JvmType](columnStats, columnType) + with NullableColumnBuilder + +private[columnar] abstract class NativeColumnBuilder[T <: AtomicType]( + override val columnStats: ColumnStats, + override val columnType: NativeColumnType[T]) + extends BasicColumnBuilder[T#InternalType](columnStats, columnType) + with NullableColumnBuilder + with AllCompressionSchemes + with CompressibleColumnBuilder[T] + +private[columnar] +class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN) + +private[columnar] +class ByteColumnBuilder extends NativeColumnBuilder(new ByteColumnStats, BYTE) + +private[columnar] class ShortColumnBuilder extends NativeColumnBuilder(new ShortColumnStats, SHORT) + +private[columnar] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT) + +private[columnar] class LongColumnBuilder extends NativeColumnBuilder(new LongColumnStats, LONG) + +private[columnar] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColumnStats, FLOAT) + +private[columnar] +class DoubleColumnBuilder extends NativeColumnBuilder(new DoubleColumnStats, DOUBLE) + +private[columnar] +class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING) + +private[columnar] +class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY) + +private[columnar] class CompactDecimalColumnBuilder(dataType: DecimalType) + extends NativeColumnBuilder(new DecimalColumnStats(dataType), COMPACT_DECIMAL(dataType)) + +private[columnar] class DecimalColumnBuilder(dataType: DecimalType) + extends ComplexColumnBuilder(new DecimalColumnStats(dataType), LARGE_DECIMAL(dataType)) + +private[columnar] class StructColumnBuilder(dataType: StructType) + extends ComplexColumnBuilder(new ObjectColumnStats(dataType), STRUCT(dataType)) + +private[columnar] class ArrayColumnBuilder(dataType: ArrayType) + extends ComplexColumnBuilder(new ObjectColumnStats(dataType), ARRAY(dataType)) + +private[columnar] class MapColumnBuilder(dataType: MapType) + extends ComplexColumnBuilder(new ObjectColumnStats(dataType), MAP(dataType)) + +private[columnar] object ColumnBuilder { + val DEFAULT_INITIAL_BUFFER_SIZE = 128 * 1024 + val MAX_BATCH_SIZE_IN_BYTE = 4 * 1024 * 1024L + + private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = { + if (orig.remaining >= size) { + orig + } else { + // grow in steps of initial size + val capacity = orig.capacity() + val newSize = capacity + size.max(capacity) + val pos = orig.position() + + ByteBuffer + .allocate(newSize) + .order(ByteOrder.nativeOrder()) + .put(orig.array(), 0, pos) + } + } + + def apply( + dataType: DataType, + initialSize: Int = 0, + columnName: String = "", + useCompression: Boolean = false): ColumnBuilder = { + val builder: ColumnBuilder = dataType match { + case NullType => new NullColumnBuilder + case BooleanType => new BooleanColumnBuilder + case ByteType => new ByteColumnBuilder + case ShortType => new ShortColumnBuilder + case IntegerType | DateType => new IntColumnBuilder + case LongType | TimestampType => new LongColumnBuilder + case FloatType => new FloatColumnBuilder + case DoubleType => new DoubleColumnBuilder + case StringType => new StringColumnBuilder + case BinaryType => new BinaryColumnBuilder + case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => + new CompactDecimalColumnBuilder(dt) + case dt: DecimalType => new DecimalColumnBuilder(dt) + case struct: StructType => new StructColumnBuilder(struct) + case array: ArrayType => new ArrayColumnBuilder(array) + case map: MapType => new MapColumnBuilder(map) + case udt: UserDefinedType[_] => + return apply(udt.sqlType, initialSize, columnName, useCompression) + case other => + throw new Exception(s"not suppported type: $other") + } + + builder.initialize(initialSize, columnName, useCompression) + builder + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala new file mode 100644 index 0000000000..c52ee9ffd6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Attribute, AttributeMap, AttributeReference} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +private[columnar] class ColumnStatisticsSchema(a: Attribute) extends Serializable { + val upperBound = AttributeReference(a.name + ".upperBound", a.dataType, nullable = true)() + val lowerBound = AttributeReference(a.name + ".lowerBound", a.dataType, nullable = true)() + val nullCount = AttributeReference(a.name + ".nullCount", IntegerType, nullable = false)() + val count = AttributeReference(a.name + ".count", IntegerType, nullable = false)() + val sizeInBytes = AttributeReference(a.name + ".sizeInBytes", LongType, nullable = false)() + + val schema = Seq(lowerBound, upperBound, nullCount, count, sizeInBytes) +} + +private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) extends Serializable { + val (forAttribute, schema) = { + val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a)) + (AttributeMap(allStats), allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _)) + } +} + +/** + * Used to collect statistical information when building in-memory columns. + * + * NOTE: we intentionally avoid using `Ordering[T]` to compare values here because `Ordering[T]` + * brings significant performance penalty. + */ +private[columnar] sealed trait ColumnStats extends Serializable { + protected var count = 0 + protected var nullCount = 0 + private[columnar] var sizeInBytes = 0L + + /** + * Gathers statistics information from `row(ordinal)`. + */ + def gatherStats(row: InternalRow, ordinal: Int): Unit = { + if (row.isNullAt(ordinal)) { + nullCount += 1 + // 4 bytes for null position + sizeInBytes += 4 + } + count += 1 + } + + /** + * Column statistics represented as a single row, currently including closed lower bound, closed + * upper bound and null count. + */ + def collectedStatistics: GenericInternalRow +} + +/** + * A no-op ColumnStats only used for testing purposes. + */ +private[columnar] class NoopColumnStats extends ColumnStats { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = super.gatherStats(row, ordinal) + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](null, null, nullCount, count, 0L)) +} + +private[columnar] class BooleanColumnStats extends ColumnStats { + protected var upper = false + protected var lower = true + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getBoolean(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += BOOLEAN.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class ByteColumnStats extends ColumnStats { + protected var upper = Byte.MinValue + protected var lower = Byte.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getByte(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += BYTE.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class ShortColumnStats extends ColumnStats { + protected var upper = Short.MinValue + protected var lower = Short.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getShort(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += SHORT.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class IntColumnStats extends ColumnStats { + protected var upper = Int.MinValue + protected var lower = Int.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getInt(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += INT.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class LongColumnStats extends ColumnStats { + protected var upper = Long.MinValue + protected var lower = Long.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getLong(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += LONG.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class FloatColumnStats extends ColumnStats { + protected var upper = Float.MinValue + protected var lower = Float.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getFloat(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += FLOAT.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class DoubleColumnStats extends ColumnStats { + protected var upper = Double.MinValue + protected var lower = Double.MaxValue + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getDouble(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += DOUBLE.defaultSize + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class StringColumnStats extends ColumnStats { + protected var upper: UTF8String = null + protected var lower: UTF8String = null + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getUTF8String(ordinal) + if (upper == null || value.compareTo(upper) > 0) upper = value.clone() + if (lower == null || value.compareTo(lower) < 0) lower = value.clone() + sizeInBytes += STRING.actualSize(row, ordinal) + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class BinaryColumnStats extends ColumnStats { + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + sizeInBytes += BINARY.actualSize(row, ordinal) + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) +} + +private[columnar] class DecimalColumnStats(precision: Int, scale: Int) extends ColumnStats { + def this(dt: DecimalType) = this(dt.precision, dt.scale) + + protected var upper: Decimal = null + protected var lower: Decimal = null + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getDecimal(ordinal, precision, scale) + if (upper == null || value.compareTo(upper) > 0) upper = value + if (lower == null || value.compareTo(lower) < 0) lower = value + // TODO: this is not right for DecimalType with precision > 18 + sizeInBytes += 8 + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](lower, upper, nullCount, count, sizeInBytes)) +} + +private[columnar] class ObjectColumnStats(dataType: DataType) extends ColumnStats { + val columnType = ColumnType(dataType) + + override def gatherStats(row: InternalRow, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + sizeInBytes += columnType.actualSize(row, ordinal) + } + } + + override def collectedStatistics: GenericInternalRow = + new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala new file mode 100644 index 0000000000..c9f2329db4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala @@ -0,0 +1,689 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.math.{BigDecimal, BigInteger} +import java.nio.ByteBuffer + +import scala.reflect.runtime.universe.TypeTag + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.types.UTF8String + + +/** + * A help class for fast reading Int/Long/Float/Double from ByteBuffer in native order. + * + * Note: There is not much difference between ByteBuffer.getByte/getShort and + * Unsafe.getByte/getShort, so we do not have helper methods for them. + * + * The unrolling (building columnar cache) is already slow, putLong/putDouble will not help much, + * so we do not have helper methods for them. + * + * + * WARNNING: This only works with HeapByteBuffer + */ +private[columnar] object ByteBufferHelper { + def getInt(buffer: ByteBuffer): Int = { + val pos = buffer.position() + buffer.position(pos + 4) + Platform.getInt(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } + + def getLong(buffer: ByteBuffer): Long = { + val pos = buffer.position() + buffer.position(pos + 8) + Platform.getLong(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } + + def getFloat(buffer: ByteBuffer): Float = { + val pos = buffer.position() + buffer.position(pos + 4) + Platform.getFloat(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } + + def getDouble(buffer: ByteBuffer): Double = { + val pos = buffer.position() + buffer.position(pos + 8) + Platform.getDouble(buffer.array(), Platform.BYTE_ARRAY_OFFSET + pos) + } +} + +/** + * An abstract class that represents type of a column. Used to append/extract Java objects into/from + * the underlying [[ByteBuffer]] of a column. + * + * @tparam JvmType Underlying Java type to represent the elements. + */ +private[columnar] sealed abstract class ColumnType[JvmType] { + + // The catalyst data type of this column. + def dataType: DataType + + // Default size in bytes for one element of type T (e.g. 4 for `Int`). + def defaultSize: Int + + /** + * Extracts a value out of the buffer at the buffer's current position. + */ + def extract(buffer: ByteBuffer): JvmType + + /** + * Extracts a value out of the buffer at the buffer's current position and stores in + * `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs whenever + * possible. + */ + def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + setField(row, ordinal, extract(buffer)) + } + + /** + * Appends the given value v of type T into the given ByteBuffer. + */ + def append(v: JvmType, buffer: ByteBuffer): Unit + + /** + * Appends `row(ordinal)` of type T into the given ByteBuffer. Subclasses should override this + * method to avoid boxing/unboxing costs whenever possible. + */ + def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + append(getField(row, ordinal), buffer) + } + + /** + * Returns the size of the value `row(ordinal)`. This is used to calculate the size of variable + * length types such as byte arrays and strings. + */ + def actualSize(row: InternalRow, ordinal: Int): Int = defaultSize + + /** + * Returns `row(ordinal)`. Subclasses should override this method to avoid boxing/unboxing costs + * whenever possible. + */ + def getField(row: InternalRow, ordinal: Int): JvmType + + /** + * Sets `row(ordinal)` to `field`. Subclasses should override this method to avoid boxing/unboxing + * costs whenever possible. + */ + def setField(row: MutableRow, ordinal: Int, value: JvmType): Unit + + /** + * Copies `from(fromOrdinal)` to `to(toOrdinal)`. Subclasses should override this method to avoid + * boxing/unboxing costs whenever possible. + */ + def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int): Unit = { + setField(to, toOrdinal, getField(from, fromOrdinal)) + } + + /** + * Creates a duplicated copy of the value. + */ + def clone(v: JvmType): JvmType = v + + override def toString: String = getClass.getSimpleName.stripSuffix("$") +} + +private[columnar] object NULL extends ColumnType[Any] { + + override def dataType: DataType = NullType + override def defaultSize: Int = 0 + override def append(v: Any, buffer: ByteBuffer): Unit = {} + override def extract(buffer: ByteBuffer): Any = null + override def setField(row: MutableRow, ordinal: Int, value: Any): Unit = row.setNullAt(ordinal) + override def getField(row: InternalRow, ordinal: Int): Any = null +} + +private[columnar] abstract class NativeColumnType[T <: AtomicType]( + val dataType: T, + val defaultSize: Int) + extends ColumnType[T#InternalType] { + + /** + * Scala TypeTag. Can be used to create primitive arrays and hash tables. + */ + def scalaTag: TypeTag[dataType.InternalType] = dataType.tag +} + +private[columnar] object INT extends NativeColumnType(IntegerType, 4) { + override def append(v: Int, buffer: ByteBuffer): Unit = { + buffer.putInt(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.putInt(row.getInt(ordinal)) + } + + override def extract(buffer: ByteBuffer): Int = { + ByteBufferHelper.getInt(buffer) + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setInt(ordinal, ByteBufferHelper.getInt(buffer)) + } + + override def setField(row: MutableRow, ordinal: Int, value: Int): Unit = { + row.setInt(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Int = row.getInt(ordinal) + + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setInt(toOrdinal, from.getInt(fromOrdinal)) + } +} + +private[columnar] object LONG extends NativeColumnType(LongType, 8) { + override def append(v: Long, buffer: ByteBuffer): Unit = { + buffer.putLong(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.putLong(row.getLong(ordinal)) + } + + override def extract(buffer: ByteBuffer): Long = { + ByteBufferHelper.getLong(buffer) + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) + } + + override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = { + row.setLong(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Long = row.getLong(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setLong(toOrdinal, from.getLong(fromOrdinal)) + } +} + +private[columnar] object FLOAT extends NativeColumnType(FloatType, 4) { + override def append(v: Float, buffer: ByteBuffer): Unit = { + buffer.putFloat(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.putFloat(row.getFloat(ordinal)) + } + + override def extract(buffer: ByteBuffer): Float = { + ByteBufferHelper.getFloat(buffer) + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setFloat(ordinal, ByteBufferHelper.getFloat(buffer)) + } + + override def setField(row: MutableRow, ordinal: Int, value: Float): Unit = { + row.setFloat(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Float = row.getFloat(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setFloat(toOrdinal, from.getFloat(fromOrdinal)) + } +} + +private[columnar] object DOUBLE extends NativeColumnType(DoubleType, 8) { + override def append(v: Double, buffer: ByteBuffer): Unit = { + buffer.putDouble(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.putDouble(row.getDouble(ordinal)) + } + + override def extract(buffer: ByteBuffer): Double = { + ByteBufferHelper.getDouble(buffer) + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setDouble(ordinal, ByteBufferHelper.getDouble(buffer)) + } + + override def setField(row: MutableRow, ordinal: Int, value: Double): Unit = { + row.setDouble(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Double = row.getDouble(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setDouble(toOrdinal, from.getDouble(fromOrdinal)) + } +} + +private[columnar] object BOOLEAN extends NativeColumnType(BooleanType, 1) { + override def append(v: Boolean, buffer: ByteBuffer): Unit = { + buffer.put(if (v) 1: Byte else 0: Byte) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.put(if (row.getBoolean(ordinal)) 1: Byte else 0: Byte) + } + + override def extract(buffer: ByteBuffer): Boolean = buffer.get() == 1 + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setBoolean(ordinal, buffer.get() == 1) + } + + override def setField(row: MutableRow, ordinal: Int, value: Boolean): Unit = { + row.setBoolean(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Boolean = row.getBoolean(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setBoolean(toOrdinal, from.getBoolean(fromOrdinal)) + } +} + +private[columnar] object BYTE extends NativeColumnType(ByteType, 1) { + override def append(v: Byte, buffer: ByteBuffer): Unit = { + buffer.put(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.put(row.getByte(ordinal)) + } + + override def extract(buffer: ByteBuffer): Byte = { + buffer.get() + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setByte(ordinal, buffer.get()) + } + + override def setField(row: MutableRow, ordinal: Int, value: Byte): Unit = { + row.setByte(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Byte = row.getByte(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setByte(toOrdinal, from.getByte(fromOrdinal)) + } +} + +private[columnar] object SHORT extends NativeColumnType(ShortType, 2) { + override def append(v: Short, buffer: ByteBuffer): Unit = { + buffer.putShort(v) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + buffer.putShort(row.getShort(ordinal)) + } + + override def extract(buffer: ByteBuffer): Short = { + buffer.getShort() + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + row.setShort(ordinal, buffer.getShort()) + } + + override def setField(row: MutableRow, ordinal: Int, value: Short): Unit = { + row.setShort(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Short = row.getShort(ordinal) + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + to.setShort(toOrdinal, from.getShort(fromOrdinal)) + } +} + +/** + * A fast path to copy var-length bytes between ByteBuffer and UnsafeRow without creating wrapper + * objects. + */ +private[columnar] trait DirectCopyColumnType[JvmType] extends ColumnType[JvmType] { + + // copy the bytes from ByteBuffer to UnsafeRow + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + if (row.isInstanceOf[MutableUnsafeRow]) { + val numBytes = buffer.getInt + val cursor = buffer.position() + buffer.position(cursor + numBytes) + row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, buffer.array(), + buffer.arrayOffset() + cursor, numBytes) + } else { + setField(row, ordinal, extract(buffer)) + } + } + + // copy the bytes from UnsafeRow to ByteBuffer + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + if (row.isInstanceOf[UnsafeRow]) { + row.asInstanceOf[UnsafeRow].writeFieldTo(ordinal, buffer) + } else { + super.append(row, ordinal, buffer) + } + } +} + +private[columnar] object STRING + extends NativeColumnType(StringType, 8) with DirectCopyColumnType[UTF8String] { + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + row.getUTF8String(ordinal).numBytes() + 4 + } + + override def append(v: UTF8String, buffer: ByteBuffer): Unit = { + buffer.putInt(v.numBytes()) + v.writeTo(buffer) + } + + override def extract(buffer: ByteBuffer): UTF8String = { + val length = buffer.getInt() + val cursor = buffer.position() + buffer.position(cursor + length) + UTF8String.fromBytes(buffer.array(), buffer.arrayOffset() + cursor, length) + } + + override def setField(row: MutableRow, ordinal: Int, value: UTF8String): Unit = { + if (row.isInstanceOf[MutableUnsafeRow]) { + row.asInstanceOf[MutableUnsafeRow].writer.write(ordinal, value) + } else { + row.update(ordinal, value.clone()) + } + } + + override def getField(row: InternalRow, ordinal: Int): UTF8String = { + row.getUTF8String(ordinal) + } + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + setField(to, toOrdinal, getField(from, fromOrdinal)) + } + + override def clone(v: UTF8String): UTF8String = v.clone() +} + +private[columnar] case class COMPACT_DECIMAL(precision: Int, scale: Int) + extends NativeColumnType(DecimalType(precision, scale), 8) { + + override def extract(buffer: ByteBuffer): Decimal = { + Decimal(ByteBufferHelper.getLong(buffer), precision, scale) + } + + override def extract(buffer: ByteBuffer, row: MutableRow, ordinal: Int): Unit = { + if (row.isInstanceOf[MutableUnsafeRow]) { + // copy it as Long + row.setLong(ordinal, ByteBufferHelper.getLong(buffer)) + } else { + setField(row, ordinal, extract(buffer)) + } + } + + override def append(v: Decimal, buffer: ByteBuffer): Unit = { + buffer.putLong(v.toUnscaledLong) + } + + override def append(row: InternalRow, ordinal: Int, buffer: ByteBuffer): Unit = { + if (row.isInstanceOf[UnsafeRow]) { + // copy it as Long + buffer.putLong(row.getLong(ordinal)) + } else { + append(getField(row, ordinal), buffer) + } + } + + override def getField(row: InternalRow, ordinal: Int): Decimal = { + row.getDecimal(ordinal, precision, scale) + } + + override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { + row.setDecimal(ordinal, value, precision) + } + + override def copyField(from: InternalRow, fromOrdinal: Int, to: MutableRow, toOrdinal: Int) { + setField(to, toOrdinal, getField(from, fromOrdinal)) + } +} + +private[columnar] object COMPACT_DECIMAL { + def apply(dt: DecimalType): COMPACT_DECIMAL = { + COMPACT_DECIMAL(dt.precision, dt.scale) + } +} + +private[columnar] sealed abstract class ByteArrayColumnType[JvmType](val defaultSize: Int) + extends ColumnType[JvmType] with DirectCopyColumnType[JvmType] { + + def serialize(value: JvmType): Array[Byte] + def deserialize(bytes: Array[Byte]): JvmType + + override def append(v: JvmType, buffer: ByteBuffer): Unit = { + val bytes = serialize(v) + buffer.putInt(bytes.length).put(bytes, 0, bytes.length) + } + + override def extract(buffer: ByteBuffer): JvmType = { + val length = buffer.getInt() + val bytes = new Array[Byte](length) + buffer.get(bytes, 0, length) + deserialize(bytes) + } +} + +private[columnar] object BINARY extends ByteArrayColumnType[Array[Byte]](16) { + + def dataType: DataType = BinaryType + + override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = { + row.update(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): Array[Byte] = { + row.getBinary(ordinal) + } + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + row.getBinary(ordinal).length + 4 + } + + def serialize(value: Array[Byte]): Array[Byte] = value + def deserialize(bytes: Array[Byte]): Array[Byte] = bytes +} + +private[columnar] case class LARGE_DECIMAL(precision: Int, scale: Int) + extends ByteArrayColumnType[Decimal](12) { + + override val dataType: DataType = DecimalType(precision, scale) + + override def getField(row: InternalRow, ordinal: Int): Decimal = { + row.getDecimal(ordinal, precision, scale) + } + + override def setField(row: MutableRow, ordinal: Int, value: Decimal): Unit = { + row.setDecimal(ordinal, value, precision) + } + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + 4 + getField(row, ordinal).toJavaBigDecimal.unscaledValue().bitLength() / 8 + 1 + } + + override def serialize(value: Decimal): Array[Byte] = { + value.toJavaBigDecimal.unscaledValue().toByteArray + } + + override def deserialize(bytes: Array[Byte]): Decimal = { + val javaDecimal = new BigDecimal(new BigInteger(bytes), scale) + Decimal.apply(javaDecimal, precision, scale) + } +} + +private[columnar] object LARGE_DECIMAL { + def apply(dt: DecimalType): LARGE_DECIMAL = { + LARGE_DECIMAL(dt.precision, dt.scale) + } +} + +private[columnar] case class STRUCT(dataType: StructType) + extends ColumnType[UnsafeRow] with DirectCopyColumnType[UnsafeRow] { + + private val numOfFields: Int = dataType.fields.size + + override def defaultSize: Int = 20 + + override def setField(row: MutableRow, ordinal: Int, value: UnsafeRow): Unit = { + row.update(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): UnsafeRow = { + row.getStruct(ordinal, numOfFields).asInstanceOf[UnsafeRow] + } + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + 4 + getField(row, ordinal).getSizeInBytes + } + + override def append(value: UnsafeRow, buffer: ByteBuffer): Unit = { + buffer.putInt(value.getSizeInBytes) + value.writeTo(buffer) + } + + override def extract(buffer: ByteBuffer): UnsafeRow = { + val sizeInBytes = ByteBufferHelper.getInt(buffer) + assert(buffer.hasArray) + val cursor = buffer.position() + buffer.position(cursor + sizeInBytes) + val unsafeRow = new UnsafeRow + unsafeRow.pointTo( + buffer.array(), + Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, + numOfFields, + sizeInBytes) + unsafeRow + } + + override def clone(v: UnsafeRow): UnsafeRow = v.copy() +} + +private[columnar] case class ARRAY(dataType: ArrayType) + extends ColumnType[UnsafeArrayData] with DirectCopyColumnType[UnsafeArrayData] { + + override def defaultSize: Int = 16 + + override def setField(row: MutableRow, ordinal: Int, value: UnsafeArrayData): Unit = { + row.update(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): UnsafeArrayData = { + row.getArray(ordinal).asInstanceOf[UnsafeArrayData] + } + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + val unsafeArray = getField(row, ordinal) + 4 + unsafeArray.getSizeInBytes + } + + override def append(value: UnsafeArrayData, buffer: ByteBuffer): Unit = { + buffer.putInt(value.getSizeInBytes) + value.writeTo(buffer) + } + + override def extract(buffer: ByteBuffer): UnsafeArrayData = { + val numBytes = buffer.getInt + assert(buffer.hasArray) + val cursor = buffer.position() + buffer.position(cursor + numBytes) + val array = new UnsafeArrayData + array.pointTo( + buffer.array(), + Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, + numBytes) + array + } + + override def clone(v: UnsafeArrayData): UnsafeArrayData = v.copy() +} + +private[columnar] case class MAP(dataType: MapType) + extends ColumnType[UnsafeMapData] with DirectCopyColumnType[UnsafeMapData] { + + override def defaultSize: Int = 32 + + override def setField(row: MutableRow, ordinal: Int, value: UnsafeMapData): Unit = { + row.update(ordinal, value) + } + + override def getField(row: InternalRow, ordinal: Int): UnsafeMapData = { + row.getMap(ordinal).asInstanceOf[UnsafeMapData] + } + + override def actualSize(row: InternalRow, ordinal: Int): Int = { + val unsafeMap = getField(row, ordinal) + 4 + unsafeMap.getSizeInBytes + } + + override def append(value: UnsafeMapData, buffer: ByteBuffer): Unit = { + buffer.putInt(value.getSizeInBytes) + value.writeTo(buffer) + } + + override def extract(buffer: ByteBuffer): UnsafeMapData = { + val numBytes = buffer.getInt + val cursor = buffer.position() + buffer.position(cursor + numBytes) + val map = new UnsafeMapData + map.pointTo( + buffer.array(), + Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + cursor, + numBytes) + map + } + + override def clone(v: UnsafeMapData): UnsafeMapData = v.copy() +} + +private[columnar] object ColumnType { + def apply(dataType: DataType): ColumnType[_] = { + dataType match { + case NullType => NULL + case BooleanType => BOOLEAN + case ByteType => BYTE + case ShortType => SHORT + case IntegerType | DateType => INT + case LongType | TimestampType => LONG + case FloatType => FLOAT + case DoubleType => DOUBLE + case StringType => STRING + case BinaryType => BINARY + case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => COMPACT_DECIMAL(dt) + case dt: DecimalType => LARGE_DECIMAL(dt) + case arr: ArrayType => ARRAY(arr) + case map: MapType => MAP(map) + case struct: StructType => STRUCT(struct) + case udt: UserDefinedType[_] => apply(udt.sqlType) + case other => + throw new Exception(s"Unsupported type: $other") + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala new file mode 100644 index 0000000000..eaafc96e4d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeRowWriter, CodeFormatter, CodeGenerator} +import org.apache.spark.sql.types._ + +/** + * An Iterator to walk through the InternalRows from a CachedBatch + */ +abstract class ColumnarIterator extends Iterator[InternalRow] { + def initialize(input: Iterator[CachedBatch], columnTypes: Array[DataType], + columnIndexes: Array[Int]): Unit +} + +/** + * An helper class to update the fields of UnsafeRow, used by ColumnAccessor + * + * WARNING: These setter MUST be called in increasing order of ordinals. + */ +class MutableUnsafeRow(val writer: UnsafeRowWriter) extends GenericMutableRow(null) { + + override def isNullAt(i: Int): Boolean = writer.isNullAt(i) + override def setNullAt(i: Int): Unit = writer.setNullAt(i) + + override def setBoolean(i: Int, v: Boolean): Unit = writer.write(i, v) + override def setByte(i: Int, v: Byte): Unit = writer.write(i, v) + override def setShort(i: Int, v: Short): Unit = writer.write(i, v) + override def setInt(i: Int, v: Int): Unit = writer.write(i, v) + override def setLong(i: Int, v: Long): Unit = writer.write(i, v) + override def setFloat(i: Int, v: Float): Unit = writer.write(i, v) + override def setDouble(i: Int, v: Double): Unit = writer.write(i, v) + + // the writer will be used directly to avoid creating wrapper objects + override def setDecimal(i: Int, v: Decimal, precision: Int): Unit = + throw new UnsupportedOperationException + override def update(i: Int, v: Any): Unit = throw new UnsupportedOperationException + + // all other methods inherited from GenericMutableRow are not need +} + +/** + * Generates bytecode for an [[ColumnarIterator]] for columnar cache. + */ +object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarIterator] with Logging { + + protected def canonicalize(in: Seq[DataType]): Seq[DataType] = in + protected def bind(in: Seq[DataType], inputSchema: Seq[Attribute]): Seq[DataType] = in + + protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { + val ctx = newCodeGenContext() + val numFields = columnTypes.size + val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => + val accessorName = ctx.freshName("accessor") + val accessorCls = dt match { + case NullType => classOf[NullColumnAccessor].getName + case BooleanType => classOf[BooleanColumnAccessor].getName + case ByteType => classOf[ByteColumnAccessor].getName + case ShortType => classOf[ShortColumnAccessor].getName + case IntegerType | DateType => classOf[IntColumnAccessor].getName + case LongType | TimestampType => classOf[LongColumnAccessor].getName + case FloatType => classOf[FloatColumnAccessor].getName + case DoubleType => classOf[DoubleColumnAccessor].getName + case StringType => classOf[StringColumnAccessor].getName + case BinaryType => classOf[BinaryColumnAccessor].getName + case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => + classOf[CompactDecimalColumnAccessor].getName + case dt: DecimalType => classOf[DecimalColumnAccessor].getName + case struct: StructType => classOf[StructColumnAccessor].getName + case array: ArrayType => classOf[ArrayColumnAccessor].getName + case t: MapType => classOf[MapColumnAccessor].getName + } + ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") + + val createCode = dt match { + case t if ctx.isPrimitiveType(dt) => + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case NullType | StringType | BinaryType => + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case other => + s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), + (${dt.getClass.getName}) columnTypes[$index]);""" + } + + val extract = s"$accessorName.extractTo(mutableRow, $index);" + val patch = dt match { + case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS => + // For large Decimal, it should have 16 bytes for future update even it's null now. + s""" + if (mutableRow.isNullAt($index)) { + rowWriter.write($index, (Decimal) null, $p, $s); + } + """ + case other => "" + } + (createCode, extract + patch) + }.unzip + + val code = s""" + import java.nio.ByteBuffer; + import java.nio.ByteOrder; + import scala.collection.Iterator; + import org.apache.spark.sql.types.DataType; + import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder; + import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter; + import org.apache.spark.sql.execution.columnar.MutableUnsafeRow; + + public SpecificColumnarIterator generate($exprType[] expr) { + return new SpecificColumnarIterator(); + } + + class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} { + + private ByteOrder nativeOrder = null; + private byte[][] buffers = null; + private UnsafeRow unsafeRow = new UnsafeRow(); + private BufferHolder bufferHolder = new BufferHolder(); + private UnsafeRowWriter rowWriter = new UnsafeRowWriter(); + private MutableUnsafeRow mutableRow = null; + + private int currentRow = 0; + private int numRowsInBatch = 0; + + private scala.collection.Iterator input = null; + private DataType[] columnTypes = null; + private int[] columnIndexes = null; + + ${declareMutableStates(ctx)} + + public SpecificColumnarIterator() { + this.nativeOrder = ByteOrder.nativeOrder(); + this.buffers = new byte[${columnTypes.length}][]; + this.mutableRow = new MutableUnsafeRow(rowWriter); + + ${initMutableStates(ctx)} + } + + public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { + this.input = input; + this.columnTypes = columnTypes; + this.columnIndexes = columnIndexes; + } + + public boolean hasNext() { + if (currentRow < numRowsInBatch) { + return true; + } + if (!input.hasNext()) { + return false; + } + + ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + currentRow = 0; + numRowsInBatch = batch.numRows(); + for (int i = 0; i < columnIndexes.length; i ++) { + buffers[i] = batch.buffers()[columnIndexes[i]]; + } + ${initializeAccessors.mkString("\n")} + + return hasNext(); + } + + public InternalRow next() { + currentRow += 1; + bufferHolder.reset(); + rowWriter.initialize(bufferHolder, $numFields); + ${extractors.mkString("\n")} + unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize()); + return unsafeRow; + } + }""" + + logDebug(s"Generated ColumnarIterator: ${CodeFormatter.format(code)}") + + compile(code).generate(ctx.references.toArray).asInstanceOf[ColumnarIterator] + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala new file mode 100644 index 0000000000..ce701fb3a7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarTableScan.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{ConvertToUnsafe, LeafNode, SparkPlan} +import org.apache.spark.sql.types.UserDefinedType +import org.apache.spark.storage.StorageLevel +import org.apache.spark.{Accumulable, Accumulator, Accumulators} + +private[sql] object InMemoryRelation { + def apply( + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + child: SparkPlan, + tableName: Option[String]): InMemoryRelation = + new InMemoryRelation(child.output, useCompression, batchSize, storageLevel, + if (child.outputsUnsafeRows) child else ConvertToUnsafe(child), + tableName)() +} + +/** + * CachedBatch is a cached batch of rows. + * + * @param numRows The total number of rows in this batch + * @param buffers The buffers for serialized columns + * @param stats The stat of columns + */ +private[columnar] +case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) + +private[sql] case class InMemoryRelation( + output: Seq[Attribute], + useCompression: Boolean, + batchSize: Int, + storageLevel: StorageLevel, + @transient child: SparkPlan, + tableName: Option[String])( + @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null, + @transient private var _statistics: Statistics = null, + private var _batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = null) + extends LogicalPlan with MultiInstanceRelation { + + private val batchStats: Accumulable[ArrayBuffer[InternalRow], InternalRow] = + if (_batchStats == null) { + child.sqlContext.sparkContext.accumulableCollection(ArrayBuffer.empty[InternalRow]) + } else { + _batchStats + } + + @transient val partitionStatistics = new PartitionStatistics(output) + + private def computeSizeInBytes = { + val sizeOfRow: Expression = + BindReferences.bindReference( + output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), + partitionStatistics.schema) + + batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum + } + + // Statistics propagation contracts: + // 1. Non-null `_statistics` must reflect the actual statistics of the underlying data + // 2. Only propagate statistics when `_statistics` is non-null + private def statisticsToBePropagated = if (_statistics == null) { + val updatedStats = statistics + if (_statistics == null) null else updatedStats + } else { + _statistics + } + + override def statistics: Statistics = { + if (_statistics == null) { + if (batchStats.value.isEmpty) { + // Underlying columnar RDD hasn't been materialized, no useful statistics information + // available, return the default statistics. + Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) + } else { + // Underlying columnar RDD has been materialized, required information has also been + // collected via the `batchStats` accumulator, compute the final statistics, + // and update `_statistics`. + _statistics = Statistics(sizeInBytes = computeSizeInBytes) + _statistics + } + } else { + // Pre-computed statistics + _statistics + } + } + + // If the cached column buffers were not passed in, we calculate them in the constructor. + // As in Spark, the actual work of caching is lazy. + if (_cachedColumnBuffers == null) { + buildBuffers() + } + + def recache(): Unit = { + _cachedColumnBuffers.unpersist() + _cachedColumnBuffers = null + buildBuffers() + } + + private def buildBuffers(): Unit = { + val output = child.output + val cached = child.execute().mapPartitionsInternal { rowIterator => + new Iterator[CachedBatch] { + def next(): CachedBatch = { + val columnBuilders = output.map { attribute => + ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression) + }.toArray + + var rowCount = 0 + var totalSize = 0L + while (rowIterator.hasNext && rowCount < batchSize + && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) { + val row = rowIterator.next() + + // Added for SPARK-6082. This assertion can be useful for scenarios when something + // like Hive TRANSFORM is used. The external data generation script used in TRANSFORM + // may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat + // hard to decipher. + assert( + row.numFields == columnBuilders.size, + s"Row column number mismatch, expected ${output.size} columns, " + + s"but got ${row.numFields}." + + s"\nRow content: $row") + + var i = 0 + totalSize = 0 + while (i < row.numFields) { + columnBuilders(i).appendFrom(row, i) + totalSize += columnBuilders(i).columnStats.sizeInBytes + i += 1 + } + rowCount += 1 + } + + val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) + .flatMap(_.values)) + + batchStats += stats + CachedBatch(rowCount, columnBuilders.map(_.build().array()), stats) + } + + def hasNext: Boolean = rowIterator.hasNext + } + }.persist(storageLevel) + + cached.setName(tableName.map(n => s"In-memory table $n").getOrElse(child.toString)) + _cachedColumnBuffers = cached + } + + def withOutput(newOutput: Seq[Attribute]): InMemoryRelation = { + InMemoryRelation( + newOutput, useCompression, batchSize, storageLevel, child, tableName)( + _cachedColumnBuffers, statisticsToBePropagated, batchStats) + } + + override def children: Seq[LogicalPlan] = Seq.empty + + override def newInstance(): this.type = { + new InMemoryRelation( + output.map(_.newInstance()), + useCompression, + batchSize, + storageLevel, + child, + tableName)( + _cachedColumnBuffers, + statisticsToBePropagated, + batchStats).asInstanceOf[this.type] + } + + def cachedColumnBuffers: RDD[CachedBatch] = _cachedColumnBuffers + + override protected def otherCopyArgs: Seq[AnyRef] = + Seq(_cachedColumnBuffers, statisticsToBePropagated, batchStats) + + private[sql] def uncache(blocking: Boolean): Unit = { + Accumulators.remove(batchStats.id) + cachedColumnBuffers.unpersist(blocking) + _cachedColumnBuffers = null + } +} + +private[sql] case class InMemoryColumnarTableScan( + attributes: Seq[Attribute], + predicates: Seq[Expression], + @transient relation: InMemoryRelation) + extends LeafNode { + + override def output: Seq[Attribute] = attributes + + // The cached version does not change the outputPartitioning of the original SparkPlan. + override def outputPartitioning: Partitioning = relation.child.outputPartitioning + + // The cached version does not change the outputOrdering of the original SparkPlan. + override def outputOrdering: Seq[SortOrder] = relation.child.outputOrdering + + override def outputsUnsafeRows: Boolean = true + + private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute(a) + + // Returned filter predicate should return false iff it is impossible for the input expression + // to evaluate to `true' based on statistics collected about this partition batch. + @transient val buildFilter: PartialFunction[Expression, Expression] = { + case And(lhs: Expression, rhs: Expression) + if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => + (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) + + case Or(lhs: Expression, rhs: Expression) + if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) => + buildFilter(lhs) || buildFilter(rhs) + + case EqualTo(a: AttributeReference, l: Literal) => + statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound + case EqualTo(l: Literal, a: AttributeReference) => + statsFor(a).lowerBound <= l && l <= statsFor(a).upperBound + + case LessThan(a: AttributeReference, l: Literal) => statsFor(a).lowerBound < l + case LessThan(l: Literal, a: AttributeReference) => l < statsFor(a).upperBound + + case LessThanOrEqual(a: AttributeReference, l: Literal) => statsFor(a).lowerBound <= l + case LessThanOrEqual(l: Literal, a: AttributeReference) => l <= statsFor(a).upperBound + + case GreaterThan(a: AttributeReference, l: Literal) => l < statsFor(a).upperBound + case GreaterThan(l: Literal, a: AttributeReference) => statsFor(a).lowerBound < l + + case GreaterThanOrEqual(a: AttributeReference, l: Literal) => l <= statsFor(a).upperBound + case GreaterThanOrEqual(l: Literal, a: AttributeReference) => statsFor(a).lowerBound <= l + + case IsNull(a: Attribute) => statsFor(a).nullCount > 0 + case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 + } + + val partitionFilters: Seq[Expression] = { + predicates.flatMap { p => + val filter = buildFilter.lift(p) + val boundFilter = + filter.map( + BindReferences.bindReference( + _, + relation.partitionStatistics.schema, + allowFailures = true)) + + boundFilter.foreach(_ => + filter.foreach(f => logInfo(s"Predicate $p generates partition filter: $f"))) + + // If the filter can't be resolved then we are missing required statistics. + boundFilter.filter(_.resolved) + } + } + + lazy val enableAccumulators: Boolean = + sqlContext.getConf("spark.sql.inMemoryTableScanStatistics.enable", "false").toBoolean + + // Accumulators used for testing purposes + lazy val readPartitions: Accumulator[Int] = sparkContext.accumulator(0) + lazy val readBatches: Accumulator[Int] = sparkContext.accumulator(0) + + private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning + + protected override def doExecute(): RDD[InternalRow] = { + if (enableAccumulators) { + readPartitions.setValue(0) + readBatches.setValue(0) + } + + // Using these variables here to avoid serialization of entire objects (if referenced directly) + // within the map Partitions closure. + val schema = relation.partitionStatistics.schema + val schemaIndex = schema.zipWithIndex + val relOutput = relation.output + val buffers = relation.cachedColumnBuffers + + buffers.mapPartitionsInternal { cachedBatchIterator => + val partitionFilter = newPredicate( + partitionFilters.reduceOption(And).getOrElse(Literal(true)), + schema) + + // Find the ordinals and data types of the requested columns. + val (requestedColumnIndices, requestedColumnDataTypes) = + attributes.map { a => + relOutput.indexWhere(_.exprId == a.exprId) -> a.dataType + }.unzip + + // Do partition batch pruning if enabled + val cachedBatchesToScan = + if (inMemoryPartitionPruningEnabled) { + cachedBatchIterator.filter { cachedBatch => + if (!partitionFilter(cachedBatch.stats)) { + def statsString: String = schemaIndex.map { + case (a, i) => + val value = cachedBatch.stats.get(i, a.dataType) + s"${a.name}: $value" + }.mkString(", ") + logInfo(s"Skipping partition based on stats $statsString") + false + } else { + if (enableAccumulators) { + readBatches += 1 + } + true + } + } + } else { + cachedBatchIterator + } + + val columnTypes = requestedColumnDataTypes.map { + case udt: UserDefinedType[_] => udt.sqlType + case other => other + }.toArray + val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + columnarIterator.initialize(cachedBatchesToScan, columnTypes, requestedColumnIndices.toArray) + if (enableAccumulators && columnarIterator.hasNext) { + readPartitions += 1 + } + columnarIterator + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala new file mode 100644 index 0000000000..8d99546924 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessor.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.catalyst.expressions.MutableRow + +private[columnar] trait NullableColumnAccessor extends ColumnAccessor { + private var nullsBuffer: ByteBuffer = _ + private var nullCount: Int = _ + private var seenNulls: Int = 0 + + private var nextNullIndex: Int = _ + private var pos: Int = 0 + + abstract override protected def initialize(): Unit = { + nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder()) + nullCount = ByteBufferHelper.getInt(nullsBuffer) + nextNullIndex = if (nullCount > 0) ByteBufferHelper.getInt(nullsBuffer) else -1 + pos = 0 + + underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4) + super.initialize() + } + + abstract override def extractTo(row: MutableRow, ordinal: Int): Unit = { + if (pos == nextNullIndex) { + seenNulls += 1 + + if (seenNulls < nullCount) { + nextNullIndex = ByteBufferHelper.getInt(nullsBuffer) + } + + row.setNullAt(ordinal) + } else { + super.extractTo(row, ordinal) + } + + pos += 1 + } + + abstract override def hasNext: Boolean = seenNulls < nullCount || super.hasNext +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala new file mode 100644 index 0000000000..3a1931bfb5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilder.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.sql.catalyst.InternalRow + +/** + * A stackable trait used for building byte buffer for a column containing null values. Memory + * layout of the final byte buffer is: + * {{{ + * .------------------- Null count N (4 bytes) + * | .--------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .--------- Non-null elements + * V V V + * +---+-----+---------+ + * | | ... | ... ... | + * +---+-----+---------+ + * }}} + */ +private[columnar] trait NullableColumnBuilder extends ColumnBuilder { + protected var nulls: ByteBuffer = _ + protected var nullCount: Int = _ + private var pos: Int = _ + + abstract override def initialize( + initialSize: Int, + columnName: String, + useCompression: Boolean): Unit = { + + nulls = ByteBuffer.allocate(1024) + nulls.order(ByteOrder.nativeOrder()) + pos = 0 + nullCount = 0 + super.initialize(initialSize, columnName, useCompression) + } + + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { + columnStats.gatherStats(row, ordinal) + if (row.isNullAt(ordinal)) { + nulls = ColumnBuilder.ensureFreeSpace(nulls, 4) + nulls.putInt(pos) + nullCount += 1 + } else { + super.appendFrom(row, ordinal) + } + pos += 1 + } + + abstract override def build(): ByteBuffer = { + val nonNulls = super.build() + val nullDataLen = nulls.position() + + nulls.limit(nullDataLen) + nulls.rewind() + + val buffer = ByteBuffer + .allocate(4 + nullDataLen + nonNulls.remaining()) + .order(ByteOrder.nativeOrder()) + .putInt(nullCount) + .put(nulls) + .put(nonNulls) + + buffer.rewind() + buffer + } + + protected def buildNonNulls(): ByteBuffer = { + nulls.limit(nulls.position()).rewind() + super.build() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala new file mode 100644 index 0000000000..6579b5068e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnAccessor.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.columnar.{ColumnAccessor, NativeColumnAccessor} +import org.apache.spark.sql.types.AtomicType + +private[columnar] trait CompressibleColumnAccessor[T <: AtomicType] extends ColumnAccessor { + this: NativeColumnAccessor[T] => + + private var decoder: Decoder[T] = _ + + abstract override protected def initialize(): Unit = { + super.initialize() + decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType) + } + + abstract override def hasNext: Boolean = super.hasNext || decoder.hasNext + + override def extractSingle(row: MutableRow, ordinal: Int): Unit = { + decoder.next(row, ordinal) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala new file mode 100644 index 0000000000..b0e216feb5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressibleColumnBuilder.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.{ByteBuffer, ByteOrder} + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.columnar.{ColumnBuilder, NativeColumnBuilder} +import org.apache.spark.sql.types.AtomicType + +/** + * A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of + * the final byte buffer is: + * {{{ + * .----------------------- Null count N (4 bytes) + * | .------------------- Null positions (4 x N bytes, empty if null count is zero) + * | | .------------- Compression scheme ID (4 bytes) + * | | | .--------- Compressed non-null elements + * V V V V + * +---+-----+---+---------+ + * | | ... | | ... ... | + * +---+-----+---+---------+ + * \-------/ \-----------/ + * header body + * }}} + */ +private[columnar] trait CompressibleColumnBuilder[T <: AtomicType] + extends ColumnBuilder with Logging { + + this: NativeColumnBuilder[T] with WithCompressionSchemes => + + var compressionEncoders: Seq[Encoder[T]] = _ + + abstract override def initialize( + initialSize: Int, + columnName: String, + useCompression: Boolean): Unit = { + + compressionEncoders = + if (useCompression) { + schemes.filter(_.supports(columnType)).map(_.encoder[T](columnType)) + } else { + Seq(PassThrough.encoder(columnType)) + } + super.initialize(initialSize, columnName, useCompression) + } + + protected def isWorthCompressing(encoder: Encoder[T]) = { + encoder.compressionRatio < 0.8 + } + + private def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + var i = 0 + while (i < compressionEncoders.length) { + compressionEncoders(i).gatherCompressibilityStats(row, ordinal) + i += 1 + } + } + + abstract override def appendFrom(row: InternalRow, ordinal: Int): Unit = { + super.appendFrom(row, ordinal) + if (!row.isNullAt(ordinal)) { + gatherCompressibilityStats(row, ordinal) + } + } + + override def build(): ByteBuffer = { + val nonNullBuffer = buildNonNulls() + val encoder: Encoder[T] = { + val candidate = compressionEncoders.minBy(_.compressionRatio) + if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType) + } + + // Header = null count + null positions + val headerSize = 4 + nulls.limit() + val compressedSize = if (encoder.compressedSize == 0) { + nonNullBuffer.remaining() + } else { + encoder.compressedSize + } + + val compressedBuffer = ByteBuffer + // Reserves 4 bytes for compression scheme ID + .allocate(headerSize + 4 + compressedSize) + .order(ByteOrder.nativeOrder) + // Write the header + .putInt(nullCount) + .put(nulls) + + logDebug(s"Compressor for [$columnName]: $encoder, ratio: ${encoder.compressionRatio}") + encoder.compress(nonNullBuffer, compressedBuffer) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala new file mode 100644 index 0000000000..920381f9c6 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/CompressionScheme.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.{ByteBuffer, ByteOrder} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.execution.columnar.{ColumnType, NativeColumnType} +import org.apache.spark.sql.types.AtomicType + +private[columnar] trait Encoder[T <: AtomicType] { + def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = {} + + def compressedSize: Int + + def uncompressedSize: Int + + def compressionRatio: Double = { + if (uncompressedSize > 0) compressedSize.toDouble / uncompressedSize else 1.0 + } + + def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer +} + +private[columnar] trait Decoder[T <: AtomicType] { + def next(row: MutableRow, ordinal: Int): Unit + + def hasNext: Boolean +} + +private[columnar] trait CompressionScheme { + def typeId: Int + + def supports(columnType: ColumnType[_]): Boolean + + def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] + + def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] +} + +private[columnar] trait WithCompressionSchemes { + def schemes: Seq[CompressionScheme] +} + +private[columnar] trait AllCompressionSchemes extends WithCompressionSchemes { + override val schemes: Seq[CompressionScheme] = CompressionScheme.all +} + +private[columnar] object CompressionScheme { + val all: Seq[CompressionScheme] = + Seq(PassThrough, RunLengthEncoding, DictionaryEncoding, BooleanBitSet, IntDelta, LongDelta) + + private val typeIdToScheme = all.map(scheme => scheme.typeId -> scheme).toMap + + def apply(typeId: Int): CompressionScheme = { + typeIdToScheme.getOrElse(typeId, throw new UnsupportedOperationException( + s"Unrecognized compression scheme type ID: $typeId")) + } + + def columnHeaderSize(columnBuffer: ByteBuffer): Int = { + val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) + val nullCount = header.getInt() + // null count + null positions + 4 + 4 * nullCount + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala new file mode 100644 index 0000000000..941f03b745 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/compression/compressionSchemes.scala @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.ByteBuffer + +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{MutableRow, SpecificMutableRow} +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.types._ + + +private[columnar] case object PassThrough extends CompressionScheme { + override val typeId = 0 + + override def supports(columnType: ColumnType[_]): Boolean = true + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def decoder[T <: AtomicType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + override def uncompressedSize: Int = 0 + + override def compressedSize: Int = 0 + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + // Writes compression type ID and copies raw contents + to.putInt(PassThrough.typeId).put(from).rewind() + to + } + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + override def next(row: MutableRow, ordinal: Int): Unit = { + columnType.extract(buffer, row, ordinal) + } + + override def hasNext: Boolean = buffer.hasRemaining + } +} + +private[columnar] case object RunLengthEncoding extends CompressionScheme { + override val typeId = 1 + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def decoder[T <: AtomicType]( + buffer: ByteBuffer, columnType: NativeColumnType[T]): Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType match { + case INT | LONG | SHORT | BYTE | STRING | BOOLEAN => true + case _ => false + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + private var _uncompressedSize = 0 + private var _compressedSize = 0 + + // Using `MutableRow` to store the last value to avoid boxing/unboxing cost. + private val lastValue = new SpecificMutableRow(Seq(columnType.dataType)) + private var lastRun = 0 + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = _compressedSize + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = columnType.getField(row, ordinal) + val actualSize = columnType.actualSize(row, ordinal) + _uncompressedSize += actualSize + + if (lastValue.isNullAt(0)) { + columnType.copyField(row, ordinal, lastValue, 0) + lastRun = 1 + _compressedSize += actualSize + 4 + } else { + if (columnType.getField(lastValue, 0) == value) { + lastRun += 1 + } else { + _compressedSize += actualSize + 4 + columnType.copyField(row, ordinal, lastValue, 0) + lastRun = 1 + } + } + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(RunLengthEncoding.typeId) + + if (from.hasRemaining) { + val currentValue = new SpecificMutableRow(Seq(columnType.dataType)) + var currentRun = 1 + val value = new SpecificMutableRow(Seq(columnType.dataType)) + + columnType.extract(from, currentValue, 0) + + while (from.hasRemaining) { + columnType.extract(from, value, 0) + + if (value.get(0, columnType.dataType) == currentValue.get(0, columnType.dataType)) { + currentRun += 1 + } else { + // Writes current run + columnType.append(currentValue, 0, to) + to.putInt(currentRun) + + // Resets current run + columnType.copyField(value, 0, currentValue, 0) + currentRun = 1 + } + } + + // Writes the last run + columnType.append(currentValue, 0, to) + to.putInt(currentRun) + } + + to.rewind() + to + } + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + private var run = 0 + private var valueCount = 0 + private var currentValue: T#InternalType = _ + + override def next(row: MutableRow, ordinal: Int): Unit = { + if (valueCount == run) { + currentValue = columnType.extract(buffer) + run = ByteBufferHelper.getInt(buffer) + valueCount = 1 + } else { + valueCount += 1 + } + + columnType.setField(row, ordinal, currentValue) + } + + override def hasNext: Boolean = valueCount < run || buffer.hasRemaining + } +} + +private[columnar] case object DictionaryEncoding extends CompressionScheme { + override val typeId = 2 + + // 32K unique values allowed + val MAX_DICT_SIZE = Short.MaxValue + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : Decoder[T] = { + new this.Decoder(buffer, columnType) + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): Encoder[T] = { + new this.Encoder[T](columnType) + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType match { + case INT | LONG | STRING => true + case _ => false + } + + class Encoder[T <: AtomicType](columnType: NativeColumnType[T]) extends compression.Encoder[T] { + // Size of the input, uncompressed, in bytes. Note that we only count until the dictionary + // overflows. + private var _uncompressedSize = 0 + + // If the number of distinct elements is too large, we discard the use of dictionary encoding + // and set the overflow flag to true. + private var overflow = false + + // Total number of elements. + private var count = 0 + + // The reverse mapping of _dictionary, i.e. mapping encoded integer to the value itself. + private var values = new mutable.ArrayBuffer[T#InternalType](1024) + + // The dictionary that maps a value to the encoded short integer. + private val dictionary = mutable.HashMap.empty[Any, Short] + + // Size of the serialized dictionary in bytes. Initialized to 4 since we need at least an `Int` + // to store dictionary element count. + private var dictionarySize = 4 + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = columnType.getField(row, ordinal) + + if (!overflow) { + val actualSize = columnType.actualSize(row, ordinal) + count += 1 + _uncompressedSize += actualSize + + if (!dictionary.contains(value)) { + if (dictionary.size < MAX_DICT_SIZE) { + val clone = columnType.clone(value) + values += clone + dictionarySize += actualSize + dictionary(clone) = dictionary.size.toShort + } else { + overflow = true + values.clear() + dictionary.clear() + } + } + } + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + if (overflow) { + throw new IllegalStateException( + "Dictionary encoding should not be used because of dictionary overflow.") + } + + to.putInt(DictionaryEncoding.typeId) + .putInt(dictionary.size) + + var i = 0 + while (i < values.length) { + columnType.append(values(i), to) + i += 1 + } + + while (from.hasRemaining) { + to.putShort(dictionary(columnType.extract(from))) + } + + to.rewind() + to + } + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = if (overflow) Int.MaxValue else dictionarySize + count * 2 + } + + class Decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + extends compression.Decoder[T] { + + private val dictionary: Array[Any] = { + val elementNum = ByteBufferHelper.getInt(buffer) + Array.fill[Any](elementNum)(columnType.extract(buffer).asInstanceOf[Any]) + } + + override def next(row: MutableRow, ordinal: Int): Unit = { + columnType.setField(row, ordinal, dictionary(buffer.getShort()).asInstanceOf[T#InternalType]) + } + + override def hasNext: Boolean = buffer.hasRemaining + } +} + +private[columnar] case object BooleanBitSet extends CompressionScheme { + override val typeId = 3 + + val BITS_PER_LONG = 64 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new this.Decoder(buffer).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new this.Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == BOOLEAN + + class Encoder extends compression.Encoder[BooleanType.type] { + private var _uncompressedSize = 0 + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + _uncompressedSize += BOOLEAN.defaultSize + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(BooleanBitSet.typeId) + // Total element count (1 byte per Boolean value) + .putInt(from.remaining) + + while (from.remaining >= BITS_PER_LONG) { + var word = 0: Long + var i = 0 + + while (i < BITS_PER_LONG) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + if (from.hasRemaining) { + var word = 0: Long + var i = 0 + + while (from.hasRemaining) { + if (BOOLEAN.extract(from)) { + word |= (1: Long) << i + } + i += 1 + } + + to.putLong(word) + } + + to.rewind() + to + } + + override def uncompressedSize: Int = _uncompressedSize + + override def compressedSize: Int = { + val extra = if (_uncompressedSize % BITS_PER_LONG == 0) 0 else 1 + (_uncompressedSize / BITS_PER_LONG + extra) * 8 + 4 + } + } + + class Decoder(buffer: ByteBuffer) extends compression.Decoder[BooleanType.type] { + private val count = ByteBufferHelper.getInt(buffer) + + private var currentWord = 0: Long + + private var visited: Int = 0 + + override def next(row: MutableRow, ordinal: Int): Unit = { + val bit = visited % BITS_PER_LONG + + visited += 1 + if (bit == 0) { + currentWord = ByteBufferHelper.getLong(buffer) + } + + row.setBoolean(ordinal, ((currentWord >> bit) & 1) != 0) + } + + override def hasNext: Boolean = visited < count + } +} + +private[columnar] case object IntDelta extends CompressionScheme { + override def typeId: Int = 4 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new Decoder(buffer, INT).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == INT + + class Encoder extends compression.Encoder[IntegerType.type] { + protected var _compressedSize: Int = 0 + protected var _uncompressedSize: Int = 0 + + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize + + private var prevValue: Int = _ + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = row.getInt(ordinal) + val delta = value - prevValue + + _compressedSize += 1 + + // If this is the first integer to be compressed, or the delta is out of byte range, then give + // up compressing this integer. + if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { + _compressedSize += INT.defaultSize + } + + _uncompressedSize += INT.defaultSize + prevValue = value + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(typeId) + + if (from.hasRemaining) { + var prev = from.getInt() + to.put(Byte.MinValue) + to.putInt(prev) + + while (from.hasRemaining) { + val current = from.getInt() + val delta = current - prev + prev = current + + if (Byte.MinValue < delta && delta <= Byte.MaxValue) { + to.put(delta.toByte) + } else { + to.put(Byte.MinValue) + to.putInt(current) + } + } + } + + to.rewind().asInstanceOf[ByteBuffer] + } + } + + class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type]) + extends compression.Decoder[IntegerType.type] { + + private var prev: Int = _ + + override def hasNext: Boolean = buffer.hasRemaining + + override def next(row: MutableRow, ordinal: Int): Unit = { + val delta = buffer.get() + prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getInt(buffer) + row.setInt(ordinal, prev) + } + } +} + +private[columnar] case object LongDelta extends CompressionScheme { + override def typeId: Int = 5 + + override def decoder[T <: AtomicType](buffer: ByteBuffer, columnType: NativeColumnType[T]) + : compression.Decoder[T] = { + new Decoder(buffer, LONG).asInstanceOf[compression.Decoder[T]] + } + + override def encoder[T <: AtomicType](columnType: NativeColumnType[T]): compression.Encoder[T] = { + (new Encoder).asInstanceOf[compression.Encoder[T]] + } + + override def supports(columnType: ColumnType[_]): Boolean = columnType == LONG + + class Encoder extends compression.Encoder[LongType.type] { + protected var _compressedSize: Int = 0 + protected var _uncompressedSize: Int = 0 + + override def compressedSize: Int = _compressedSize + override def uncompressedSize: Int = _uncompressedSize + + private var prevValue: Long = _ + + override def gatherCompressibilityStats(row: InternalRow, ordinal: Int): Unit = { + val value = row.getLong(ordinal) + val delta = value - prevValue + + _compressedSize += 1 + + // If this is the first long integer to be compressed, or the delta is out of byte range, then + // give up compressing this long integer. + if (_uncompressedSize == 0 || delta <= Byte.MinValue || delta > Byte.MaxValue) { + _compressedSize += LONG.defaultSize + } + + _uncompressedSize += LONG.defaultSize + prevValue = value + } + + override def compress(from: ByteBuffer, to: ByteBuffer): ByteBuffer = { + to.putInt(typeId) + + if (from.hasRemaining) { + var prev = from.getLong() + to.put(Byte.MinValue) + to.putLong(prev) + + while (from.hasRemaining) { + val current = from.getLong() + val delta = current - prev + prev = current + + if (Byte.MinValue < delta && delta <= Byte.MaxValue) { + to.put(delta.toByte) + } else { + to.put(Byte.MinValue) + to.putLong(current) + } + } + } + + to.rewind().asInstanceOf[ByteBuffer] + } + } + + class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type]) + extends compression.Decoder[LongType.type] { + + private var prev: Long = _ + + override def hasNext: Boolean = buffer.hasRemaining + + override def next(row: MutableRow, ordinal: Int): Unit = { + val delta = buffer.get() + prev = if (delta > Byte.MinValue) prev + delta else ByteBufferHelper.getLong(buffer) + row.setLong(ordinal, prev) + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala index 28fa231e72..c912734bba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/package.scala @@ -19,5 +19,7 @@ package org.apache.spark.sql /** * The physical execution component of Spark SQL. Note that this is a private package. + * All classes in catalyst are considered an internal API to Spark SQL and are subject + * to change between minor releases. */ package object execution diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index bce94dafad..d86df4cfb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -27,7 +27,7 @@ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.apache.spark.Accumulators -import org.apache.spark.sql.columnar._ +import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SQLTestUtils, SharedSQLContext} import org.apache.spark.storage.{StorageLevel, RDDBlockId} @@ -280,7 +280,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sql("CACHE TABLE testData") sqlContext.table("testData").queryExecution.withCachedData.collect { case cached: InMemoryRelation => - val actualSizeInBytes = (1 to 100).map(i => INT.defaultSize + i.toString.length + 4).sum + val actualSizeInBytes = (1 to 100).map(i => 4 + i.toString.length + 4).sum assert(cached.statistics.sizeInBytes === actualSizeInBytes) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index b5417b195f..6ea1fe4ccf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.execution.columnar.InMemoryRelation abstract class QueryTest extends PlanTest { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala deleted file mode 100644 index 89a664001b..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types._ - -class ColumnStatsSuite extends SparkFunSuite { - testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 0)) - testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0)) - testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0)) - testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0)) - testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0)) - testColumnStats(classOf[DoubleColumnStats], DOUBLE, - createRow(Double.MaxValue, Double.MinValue, 0)) - testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0)) - testDecimalColumnStats(createRow(null, null, 0)) - - def createRow(values: Any*): GenericInternalRow = new GenericInternalRow(values.toArray) - - def testColumnStats[T <: AtomicType, U <: ColumnStats]( - columnStatsClass: Class[U], - columnType: NativeColumnType[T], - initialStatistics: GenericInternalRow): Unit = { - - val columnStatsName = columnStatsClass.getSimpleName - - test(s"$columnStatsName: empty") { - val columnStats = columnStatsClass.newInstance() - columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { - case (actual, expected) => assert(actual === expected) - } - } - - test(s"$columnStatsName: non-empty") { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - val columnStats = columnStatsClass.newInstance() - val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) - rows.foreach(columnStats.gatherStats(_, 0)) - - val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) - val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] - val stats = columnStats.collectedStatistics - - assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) - assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) - assertResult(10, "Wrong null count")(stats.values(2)) - assertResult(20, "Wrong row count")(stats.values(3)) - assertResult(stats.values(4), "Wrong size in bytes") { - rows.map { row => - if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) - }.sum - } - } - } - - def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats]( - initialStatistics: GenericInternalRow): Unit = { - - val columnStatsName = classOf[DecimalColumnStats].getSimpleName - val columnType = COMPACT_DECIMAL(15, 10) - - test(s"$columnStatsName: empty") { - val columnStats = new DecimalColumnStats(15, 10) - columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { - case (actual, expected) => assert(actual === expected) - } - } - - test(s"$columnStatsName: non-empty") { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - val columnStats = new DecimalColumnStats(15, 10) - val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) - rows.foreach(columnStats.gatherStats(_, 0)) - - val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) - val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] - val stats = columnStats.collectedStatistics - - assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) - assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) - assertResult(10, "Wrong null count")(stats.values(2)) - assertResult(20, "Wrong row count")(stats.values(3)) - assertResult(stats.values(4), "Wrong size in bytes") { - rows.map { row => - if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) - }.sum - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala deleted file mode 100644 index 63bc39bfa0..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.{ByteOrder, ByteBuffer} - -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types._ -import org.apache.spark.{Logging, SparkFunSuite} - - -class ColumnTypeSuite extends SparkFunSuite with Logging { - private val DEFAULT_BUFFER_SIZE = 512 - private val MAP_TYPE = MAP(MapType(IntegerType, StringType)) - private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType)) - private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: Nil)) - - test("defaultSize") { - val checks = Map( - NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8, - FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12, - STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE -> 32) - - checks.foreach { case (columnType, expectedSize) => - assertResult(expectedSize, s"Wrong defaultSize for $columnType") { - columnType.defaultSize - } - } - } - - test("actualSize") { - def checkActualSize( - columnType: ColumnType[_], - value: Any, - expected: Int): Unit = { - - assertResult(expected, s"Wrong actualSize for $columnType") { - val row = new GenericMutableRow(1) - row.update(0, CatalystTypeConverters.convertToCatalyst(value)) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - columnType.actualSize(proj(row), 0) - } - } - - checkActualSize(NULL, null, 0) - checkActualSize(BOOLEAN, true, 1) - checkActualSize(BYTE, Byte.MaxValue, 1) - checkActualSize(SHORT, Short.MaxValue, 2) - checkActualSize(INT, Int.MaxValue, 4) - checkActualSize(LONG, Long.MaxValue, 8) - checkActualSize(FLOAT, Float.MaxValue, 4) - checkActualSize(DOUBLE, Double.MaxValue, 8) - checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) - checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) - checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) - checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) - checkActualSize(ARRAY_TYPE, Array[Any](1), 16) - checkActualSize(MAP_TYPE, Map(1 -> "a"), 29) - checkActualSize(STRUCT_TYPE, Row("hello"), 28) - } - - testNativeColumnType(BOOLEAN) - testNativeColumnType(BYTE) - testNativeColumnType(SHORT) - testNativeColumnType(INT) - testNativeColumnType(LONG) - testNativeColumnType(FLOAT) - testNativeColumnType(DOUBLE) - testNativeColumnType(COMPACT_DECIMAL(15, 10)) - testNativeColumnType(STRING) - - testColumnType(NULL) - testColumnType(BINARY) - testColumnType(LARGE_DECIMAL(20, 10)) - testColumnType(STRUCT_TYPE) - testColumnType(ARRAY_TYPE) - testColumnType(MAP_TYPE) - - def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): Unit = { - testColumnType[T#InternalType](columnType) - } - - def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = { - - val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder()) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) - val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy()) - - test(s"$columnType append/extract") { - buffer.rewind() - seq.foreach(columnType.append(_, 0, buffer)) - - buffer.rewind() - seq.foreach { row => - logInfo("buffer = " + buffer + ", expected = " + row) - val expected = converter(row.get(0, columnType.dataType)) - val extracted = converter(columnType.extract(buffer)) - assert(expected === extracted, - s"Extracted value didn't equal to the original one. $expected != $extracted, buffer =" + - dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer])) - } - } - } - - private def dumpBuffer(buff: ByteBuffer): Any = { - val sb = new StringBuilder() - while (buff.hasRemaining) { - val b = buff.get() - sb.append(Integer.toHexString(b & 0xff)).append(' ') - } - if (sb.nonEmpty) sb.setLength(sb.length - 1) - sb.toString() - } - - test("column type for decimal types with different precision") { - (1 to 18).foreach { i => - assertResult(COMPACT_DECIMAL(i, 0)) { - ColumnType(DecimalType(i, 0)) - } - } - - assertResult(LARGE_DECIMAL(19, 0)) { - ColumnType(DecimalType(19, 0)) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala deleted file mode 100644 index a5882f7870..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import scala.collection.immutable.HashSet -import scala.util.Random - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} -import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData} -import org.apache.spark.sql.types.{AtomicType, Decimal} -import org.apache.spark.unsafe.types.UTF8String - -object ColumnarTestUtils { - def makeNullRow(length: Int): GenericMutableRow = { - val row = new GenericMutableRow(length) - (0 until length).foreach(row.setNullAt) - row - } - - def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = { - def randomBytes(length: Int) = { - val bytes = new Array[Byte](length) - Random.nextBytes(bytes) - bytes - } - - (columnType match { - case NULL => null - case BOOLEAN => Random.nextBoolean() - case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte - case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort - case INT => Random.nextInt() - case LONG => Random.nextLong() - case FLOAT => Random.nextFloat() - case DOUBLE => Random.nextDouble() - case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32))) - case BINARY => randomBytes(Random.nextInt(32)) - case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale) - case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale) - case STRUCT(_) => - new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10)))) - case ARRAY(_) => - new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt())) - case MAP(_) => - ArrayBasedMapData( - Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))) - }).asInstanceOf[JvmType] - } - - def makeRandomValues( - head: ColumnType[_], - tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail) - - def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = { - columnTypes.map(makeRandomValue(_)) - } - - def makeUniqueRandomValues[JvmType]( - columnType: ColumnType[JvmType], - count: Int): Seq[JvmType] = { - - Iterator.iterate(HashSet.empty[JvmType]) { set => - set + Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next() - }.drop(count).next().toSeq - } - - def makeRandomRow( - head: ColumnType[_], - tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail) - - def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = { - val row = new GenericMutableRow(columnTypes.length) - makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => - row(index) = value - } - row - } - - def makeUniqueValuesAndSingleValueRows[T <: AtomicType]( - columnType: NativeColumnType[T], - count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = { - - val values = makeUniqueRandomValues(columnType, count) - val rows = values.map { value => - val row = new GenericMutableRow(1) - row(0) = value - row - } - - (values, rows) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala deleted file mode 100644 index 6265e40a0a..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.sql.{Date, Timestamp} - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.test.SQLTestData._ -import org.apache.spark.sql.types._ -import org.apache.spark.storage.StorageLevel.MEMORY_ONLY - -class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { - import testImplicits._ - - setupTestData() - - test("simple columnar query") { - val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().toSeq) - } - - test("default size avoids broadcast") { - // TODO: Improve this test when we have better statistics - sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) - .toDF().registerTempTable("sizeTst") - sqlContext.cacheTable("sizeTst") - assert( - sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes > - sqlContext.conf.autoBroadcastJoinThreshold) - } - - test("projection") { - val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().map { - case Row(key: Int, value: String) => value -> key - }.map(Row.fromTuple)) - } - - test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { - val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan - val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) - - checkAnswer(scan, testData.collect().toSeq) - checkAnswer(scan, testData.collect().toSeq) - } - - test("SPARK-1678 regression: compression must not lose repeated values") { - checkAnswer( - sql("SELECT * FROM repeatedData"), - repeatedData.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("repeatedData") - - checkAnswer( - sql("SELECT * FROM repeatedData"), - repeatedData.collect().toSeq.map(Row.fromTuple)) - } - - test("with null values") { - checkAnswer( - sql("SELECT * FROM nullableRepeatedData"), - nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("nullableRepeatedData") - - checkAnswer( - sql("SELECT * FROM nullableRepeatedData"), - nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) - } - - test("SPARK-2729 regression: timestamp data type") { - val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time") - timestamps.registerTempTable("timestamps") - - checkAnswer( - sql("SELECT time FROM timestamps"), - timestamps.collect().toSeq) - - sqlContext.cacheTable("timestamps") - - checkAnswer( - sql("SELECT time FROM timestamps"), - timestamps.collect().toSeq) - } - - test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { - checkAnswer( - sql("SELECT * FROM withEmptyParts"), - withEmptyParts.collect().toSeq.map(Row.fromTuple)) - - sqlContext.cacheTable("withEmptyParts") - - checkAnswer( - sql("SELECT * FROM withEmptyParts"), - withEmptyParts.collect().toSeq.map(Row.fromTuple)) - } - - test("SPARK-4182 Caching complex types") { - complexData.cache().count() - // Shouldn't throw - complexData.count() - complexData.unpersist() - } - - test("decimal type") { - // Casting is required here because ScalaReflection can't capture decimal precision information. - val df = (1 to 10) - .map(i => Tuple1(Decimal(i, 15, 10))) - .toDF("dec") - .select($"dec" cast DecimalType(15, 10)) - - assert(df.schema.head.dataType === DecimalType(15, 10)) - - df.cache().registerTempTable("test_fixed_decimal") - checkAnswer( - sql("SELECT * FROM test_fixed_decimal"), - (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal))) - } - - test("test different data types") { - // Create the schema. - val struct = - StructType( - StructField("f1", FloatType, true) :: - StructField("f2", ArrayType(BooleanType), true) :: Nil) - val dataTypes = - Seq(StringType, BinaryType, NullType, BooleanType, - ByteType, ShortType, IntegerType, LongType, - FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), - DateType, TimestampType, - ArrayType(IntegerType), MapType(StringType, LongType), struct) - val fields = dataTypes.zipWithIndex.map { case (dataType, index) => - StructField(s"col$index", dataType, true) - } - val allColumns = fields.map(_.name).mkString(",") - val schema = StructType(fields) - - // Create a RDD for the schema - val rdd = - sparkContext.parallelize((1 to 10000), 10).map { i => - Row( - s"str${i}: test cache.", - s"binary${i}: test cache.".getBytes("UTF-8"), - null, - i % 2 == 0, - i.toByte, - i.toShort, - i, - Long.MaxValue - i.toLong, - (i + 0.25).toFloat, - (i + 0.75), - BigDecimal(Long.MaxValue.toString + ".12345"), - new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), - new Date(i), - new Timestamp(i * 1000000L), - (i to i + 10).toSeq, - (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, - Row((i - 0.25).toFloat, Seq(true, false, null))) - } - sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types") - // Cache the table. - sql("cache table InMemoryCache_different_data_types") - // Make sure the table is indeed cached. - sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan - assert( - sqlContext.isCached("InMemoryCache_different_data_types"), - "InMemoryCache_different_data_types should be cached.") - // Issue a query and check the results. - checkAnswer( - sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"), - sqlContext.table("InMemoryCache_different_data_types").collect()) - sqlContext.dropTempTable("InMemoryCache_different_data_types") - } - - test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { - val df = sqlContext.range(1, 100).selectExpr("id % 10 as id") - .rdd.map(id => Tuple1(s"str_$id")).toDF("i") - val cached = df.cache() - // count triggers the caching action. It should not throw. - cached.count() - - // Make sure, the DataFrame is indeed cached. - assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty) - - // Check result. - checkAnswer( - cached, - sqlContext.range(1, 100).selectExpr("id % 10 as id") - .rdd.map(id => Tuple1(s"str_$id")).toDF("i") - ) - - // Drop the cache. - cached.unpersist() - } - - test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not evaluated correctly") { - val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s") - data.cache() - assert(data.count() === 10) - assert(data.filter($"s" === "3").count() === 1) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala deleted file mode 100644 index aa1605fee8..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import java.nio.ByteBuffer - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.types._ - -class TestNullableColumnAccessor[JvmType]( - buffer: ByteBuffer, - columnType: ColumnType[JvmType]) - extends BasicColumnAccessor(buffer, columnType) - with NullableColumnAccessor - -object TestNullableColumnAccessor { - def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType]) - : TestNullableColumnAccessor[JvmType] = { - new TestNullableColumnAccessor(buffer, columnType) - } -} - -class NullableColumnAccessorSuite extends SparkFunSuite { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - Seq( - NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, - STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), - STRUCT(StructType(StructField("a", StringType) :: Nil)), - ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) - .foreach { - testNullableColumnAccessor(_) - } - - def testNullableColumnAccessor[JvmType]( - columnType: ColumnType[JvmType]): Unit = { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - val nullRow = makeNullRow(1) - - test(s"Nullable $typeName column accessor: empty column") { - val builder = TestNullableColumnBuilder(columnType) - val accessor = TestNullableColumnAccessor(builder.build(), columnType) - assert(!accessor.hasNext) - } - - test(s"Nullable $typeName column accessor: access null values") { - val builder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) - - (0 until 4).foreach { _ => - builder.appendFrom(proj(randomRow), 0) - builder.appendFrom(proj(nullRow), 0) - } - - val accessor = TestNullableColumnAccessor(builder.build(), columnType) - val row = new GenericMutableRow(1) - val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) - - (0 until 4).foreach { _ => - assert(accessor.hasNext) - accessor.extractTo(row, 0) - assert(converter(row.get(0, columnType.dataType)) - === converter(randomRow.get(0, columnType.dataType))) - - assert(accessor.hasNext) - accessor.extractTo(row, 0) - assert(row.isNullAt(0)) - } - - assert(!accessor.hasNext) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala deleted file mode 100644 index 9140457783..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} -import org.apache.spark.sql.types._ - -class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType]) - extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType) - with NullableColumnBuilder - -object TestNullableColumnBuilder { - def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0) - : TestNullableColumnBuilder[JvmType] = { - val builder = new TestNullableColumnBuilder(columnType) - builder.initialize(initialSize) - builder - } -} - -class NullableColumnBuilderSuite extends SparkFunSuite { - import org.apache.spark.sql.columnar.ColumnarTestUtils._ - - Seq( - BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, - STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), - STRUCT(StructType(StructField("a", StringType) :: Nil)), - ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) - .foreach { - testNullableColumnBuilder(_) - } - - def testNullableColumnBuilder[JvmType]( - columnType: ColumnType[JvmType]): Unit = { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - val dataType = columnType.dataType - val proj = UnsafeProjection.create(Array[DataType](dataType)) - val converter = CatalystTypeConverters.createToScalaConverter(dataType) - - test(s"$typeName column builder: empty column") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val buffer = columnBuilder.build() - - assertResult(0, "Wrong null count")(buffer.getInt()) - assert(!buffer.hasRemaining) - } - - test(s"$typeName column builder: buffer size auto growth") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - - (0 until 4).foreach { _ => - columnBuilder.appendFrom(proj(randomRow), 0) - } - - val buffer = columnBuilder.build() - - assertResult(0, "Wrong null count")(buffer.getInt()) - } - - test(s"$typeName column builder: null values") { - val columnBuilder = TestNullableColumnBuilder(columnType) - val randomRow = makeRandomRow(columnType) - val nullRow = makeNullRow(1) - - (0 until 4).foreach { _ => - columnBuilder.appendFrom(proj(randomRow), 0) - columnBuilder.appendFrom(proj(nullRow), 0) - } - - val buffer = columnBuilder.build() - - assertResult(4, "Wrong null count")(buffer.getInt()) - - // For null positions - (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt())) - - // For non-null values - val actual = new GenericMutableRow(new Array[Any](1)) - (0 until 4).foreach { _ => - columnType.extract(buffer, actual, 0) - assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)), - "Extracted value didn't equal to the original one") - } - - assert(!buffer.hasRemaining) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala deleted file mode 100644 index 6b7401464f..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql._ -import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.test.SQLTestData._ - -class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext { - import testImplicits._ - - private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize - private lazy val originalInMemoryPartitionPruning = sqlContext.conf.inMemoryPartitionPruning - - override protected def beforeAll(): Unit = { - super.beforeAll() - // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch - sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10) - - val pruningData = sparkContext.makeRDD((1 to 100).map { key => - val string = if (((key - 1) / 10) % 2 == 0) null else key.toString - TestData(key, string) - }, 5).toDF() - pruningData.registerTempTable("pruningData") - - // Enable in-memory partition pruning - sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) - // Enable in-memory table scan accumulators - sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") - sqlContext.cacheTable("pruningData") - } - - override protected def afterAll(): Unit = { - try { - sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) - sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - sqlContext.uncacheTable("pruningData") - } finally { - super.afterAll() - } - } - - // Comparisons - checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1)) - checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1)) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 100) - checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 100) - - // IS NULL - checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) { - (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90) - } - - // IS NOT NULL - checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 5) { - (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100) - } - - // Conjunction and disjunction - checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 2, 3)(9 to 21) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 2, 2)(Seq(1, 100)) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT NULL", 1, 2)(1 to 11) - checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 AND key < 92)", 3, 4) { - Seq(1) ++ (79 to 91) - } - checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) { - // Although the `NOT` operator isn't supported directly, the optimizer can transform - // `NOT (a < b)` to `b >= a` - 88 to 100 - } - - // With unsupported predicate - { - val seq = (1 to 30).mkString(", ") - checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq))", 5, 10)(31 to 100) - checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) AND key > 88", 1, 2) { - 89 to 100 - } - } - - def checkBatchPruning( - query: String, - expectedReadPartitions: Int, - expectedReadBatches: Int)( - expectedQueryResult: => Seq[Int]): Unit = { - - test(query) { - val df = sql(query) - val queryExecution = df.queryExecution - - assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") { - df.collect().map(_(0)).toArray - } - - val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect { - case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) - }.head - - assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution") - assert( - readPartitions === expectedReadPartitions, - s"Wrong number of read partitions: $queryExecution") - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala deleted file mode 100644 index 9a2948c59b..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.columnar.{BOOLEAN, NoopColumnStats} - -class BooleanBitSetSuite extends SparkFunSuite { - import BooleanBitSet._ - - def skeleton(count: Int) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) - val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) - val values = rows.map(_.getBoolean(0)) - - rows.foreach(builder.appendFrom(_, 0)) - val buffer = builder.build() - - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + element count + bitset words - val compressedSize = 4 + 4 + { - val extra = if (count % BITS_PER_LONG == 0) 0 else 1 - (count / BITS_PER_LONG + extra) * 8 - } - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) - assertResult(count, "Wrong element count")(buffer.getInt()) - - var word = 0: Long - for (i <- 0 until count) { - val bit = i % BITS_PER_LONG - word = if (bit == 0) buffer.getLong() else word - assertResult(values(i), s"Wrong value in compressed buffer, index=$i") { - (word & ((1: Long) << bit)) != 0 - } - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) - val mutableRow = new GenericMutableRow(1) - if (values.nonEmpty) { - values.foreach { - assert(decoder.hasNext) - assertResult(_, "Wrong decoded value") { - decoder.next(mutableRow, 0) - mutableRow.getBoolean(0) - } - } - } - assert(!decoder.hasNext) - } - - test(s"$BooleanBitSet: empty") { - skeleton(0) - } - - test(s"$BooleanBitSet: less than 1 word") { - skeleton(BITS_PER_LONG - 1) - } - - test(s"$BooleanBitSet: exactly 1 word") { - skeleton(BITS_PER_LONG) - } - - test(s"$BooleanBitSet: multiple whole words") { - skeleton(BITS_PER_LONG * 2) - } - - test(s"$BooleanBitSet: multiple words and 1 more bit") { - skeleton(BITS_PER_LONG * 2 + 1) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala deleted file mode 100644 index acfab6586c..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import java.nio.ByteBuffer - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.AtomicType - -class DictionaryEncodingSuite extends SparkFunSuite { - testDictionaryEncoding(new IntColumnStats, INT) - testDictionaryEncoding(new LongColumnStats, LONG) - testDictionaryEncoding(new StringColumnStats, STRING) - - def testDictionaryEncoding[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T]) { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - - def buildDictionary(buffer: ByteBuffer) = { - (0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap - } - - def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) { - Seq.empty - } else { - seq.head +: seq.tail.filterNot(_ == seq.head) - } - - def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) - val dictValues = stableDistinct(inputSeq) - - inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) - - if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) { - withClue("Dictionary overflowed, compression should fail") { - intercept[Throwable] { - builder.build() - } - } - } else { - val buffer = builder.build() - val headerSize = CompressionScheme.columnHeaderSize(buffer) - // 4 extra bytes for dictionary size - val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum - // 2 bytes for each `Short` - val compressedSize = 4 + dictionarySize + 2 * inputSeq.length - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - val dictionary = buildDictionary(buffer).toMap - - dictValues.foreach { i => - assertResult(i, "Wrong dictionary entry") { - dictionary(values(i)) - } - } - - inputSeq.foreach { i => - assertResult(i.toShort, "Wrong column element value")(buffer.getShort()) - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = DictionaryEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (inputSeq.nonEmpty) { - inputSeq.foreach { i => - assert(decoder.hasNext) - assertResult(values(i), "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - - assert(!decoder.hasNext) - } - } - - test(s"$DictionaryEncoding with $typeName: empty") { - skeleton(0, Seq.empty) - } - - test(s"$DictionaryEncoding with $typeName: simple case") { - skeleton(2, Seq(0, 1, 0, 1)) - } - - test(s"$DictionaryEncoding with $typeName: dictionary overflow") { - skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala deleted file mode 100644 index 2111e9fbe6..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.IntegralType - -class IntegralDeltaSuite extends SparkFunSuite { - testIntegralDelta(new IntColumnStats, INT, IntDelta) - testIntegralDelta(new LongColumnStats, LONG, LongDelta) - - def testIntegralDelta[I <: IntegralType]( - columnStats: ColumnStats, - columnType: NativeColumnType[I], - scheme: CompressionScheme) { - - def skeleton(input: Seq[I#InternalType]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) - val deltas = if (input.isEmpty) { - Seq.empty[Long] - } else { - (input.tail, input.init).zipped.map { - case (x: Int, y: Int) => (x - y).toLong - case (x: Long, y: Long) => x - y - } - } - - input.map { value => - val row = new GenericMutableRow(1) - columnType.setField(row, 0, value) - builder.appendFrom(row, 0) - } - - val buffer = builder.build() - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + compressed contents - val compressedSize = 4 + (if (deltas.isEmpty) { - 0 - } else { - val oneBoolean = columnType.defaultSize - 1 + oneBoolean + deltas.map { - d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean - }.sum - }) - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - buffer.position(headerSize) - assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - if (input.nonEmpty) { - assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) - assertResult(input.head, "The first value is wrong")(columnType.extract(buffer)) - - (input.tail, deltas).zipped.foreach { (value, delta) => - if (math.abs(delta) <= Byte.MaxValue) { - assertResult(delta, "Wrong delta")(buffer.get()) - } else { - assertResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) - assertResult(value, "Wrong value")(columnType.extract(buffer)) - } - } - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = scheme.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (input.nonEmpty) { - input.foreach{ - assert(decoder.hasNext) - assertResult(_, "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - assert(!decoder.hasNext) - } - - test(s"$scheme: empty column") { - skeleton(Seq.empty) - } - - test(s"$scheme: simple case") { - val input = columnType match { - case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) - case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) - } - - skeleton(input.map(_.asInstanceOf[I#InternalType])) - } - - test(s"$scheme: long random series") { - // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. - val input = Array.fill[Any](10000)(makeRandomValue(columnType)) - skeleton(input.map(_.asInstanceOf[I#InternalType])) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala deleted file mode 100644 index 67ec08f594..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.columnar.ColumnarTestUtils._ -import org.apache.spark.sql.types.AtomicType - -class RunLengthEncodingSuite extends SparkFunSuite { - testRunLengthEncoding(new NoopColumnStats, BOOLEAN) - testRunLengthEncoding(new ByteColumnStats, BYTE) - testRunLengthEncoding(new ShortColumnStats, SHORT) - testRunLengthEncoding(new IntColumnStats, INT) - testRunLengthEncoding(new LongColumnStats, LONG) - testRunLengthEncoding(new StringColumnStats, STRING) - - def testRunLengthEncoding[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T]) { - - val typeName = columnType.getClass.getSimpleName.stripSuffix("$") - - def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { - // ------------- - // Tests encoder - // ------------- - - val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) - val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) - val inputSeq = inputRuns.flatMap { case (index, run) => - Seq.fill(run)(index) - } - - inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) - val buffer = builder.build() - - // Column type ID + null count + null positions - val headerSize = CompressionScheme.columnHeaderSize(buffer) - - // Compression scheme ID + compressed contents - val compressedSize = 4 + inputRuns.map { case (index, _) => - // 4 extra bytes each run for run length - columnType.actualSize(rows(index), 0) + 4 - }.sum - - // 4 extra bytes for compression scheme type ID - assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) - - // Skips column header - buffer.position(headerSize) - assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) - - inputRuns.foreach { case (index, run) => - assertResult(values(index), "Wrong column element value")(columnType.extract(buffer)) - assertResult(run, "Wrong run length")(buffer.getInt()) - } - - // ------------- - // Tests decoder - // ------------- - - // Rewinds, skips column header and 4 more bytes for compression scheme ID - buffer.rewind().position(headerSize + 4) - - val decoder = RunLengthEncoding.decoder(buffer, columnType) - val mutableRow = new GenericMutableRow(1) - - if (inputSeq.nonEmpty) { - inputSeq.foreach { i => - assert(decoder.hasNext) - assertResult(values(i), "Wrong decoded value") { - decoder.next(mutableRow, 0) - columnType.getField(mutableRow, 0) - } - } - } - - assert(!decoder.hasNext) - } - - test(s"$RunLengthEncoding with $typeName: empty column") { - skeleton(0, Seq.empty) - } - - test(s"$RunLengthEncoding with $typeName: simple case") { - skeleton(2, Seq(0 -> 2, 1 ->2)) - } - - test(s"$RunLengthEncoding with $typeName: run length == 1") { - skeleton(2, Seq(0 -> 1, 1 ->1)) - } - - test(s"$RunLengthEncoding with $typeName: single long run") { - skeleton(1, Seq(0 -> 1000)) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala deleted file mode 100644 index 5268dfe0aa..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar.compression - -import org.apache.spark.sql.columnar._ -import org.apache.spark.sql.types.AtomicType - -class TestCompressibleColumnBuilder[T <: AtomicType]( - override val columnStats: ColumnStats, - override val columnType: NativeColumnType[T], - override val schemes: Seq[CompressionScheme]) - extends NativeColumnBuilder(columnStats, columnType) - with NullableColumnBuilder - with CompressibleColumnBuilder[T] { - - override protected def isWorthCompressing(encoder: Encoder[T]) = true -} - -object TestCompressibleColumnBuilder { - def apply[T <: AtomicType]( - columnStats: ColumnStats, - columnType: NativeColumnType[T], - scheme: CompressionScheme): TestCompressibleColumnBuilder[T] = { - - val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) - builder.initialize(0, "", useCompression = true) - builder - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala new file mode 100644 index 0000000000..b2d04f7c5a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnStatsSuite.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.types._ + +class ColumnStatsSuite extends SparkFunSuite { + testColumnStats(classOf[BooleanColumnStats], BOOLEAN, createRow(true, false, 0)) + testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0)) + testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0)) + testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0)) + testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0)) + testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0)) + testColumnStats(classOf[DoubleColumnStats], DOUBLE, + createRow(Double.MaxValue, Double.MinValue, 0)) + testColumnStats(classOf[StringColumnStats], STRING, createRow(null, null, 0)) + testDecimalColumnStats(createRow(null, null, 0)) + + def createRow(values: Any*): GenericInternalRow = new GenericInternalRow(values.toArray) + + def testColumnStats[T <: AtomicType, U <: ColumnStats]( + columnStatsClass: Class[U], + columnType: NativeColumnType[T], + initialStatistics: GenericInternalRow): Unit = { + + val columnStatsName = columnStatsClass.getSimpleName + + test(s"$columnStatsName: empty") { + val columnStats = columnStatsClass.newInstance() + columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { + case (actual, expected) => assert(actual === expected) + } + } + + test(s"$columnStatsName: non-empty") { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + val columnStats = columnStatsClass.newInstance() + val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) + rows.foreach(columnStats.gatherStats(_, 0)) + + val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) + val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] + val stats = columnStats.collectedStatistics + + assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) + assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) + assertResult(10, "Wrong null count")(stats.values(2)) + assertResult(20, "Wrong row count")(stats.values(3)) + assertResult(stats.values(4), "Wrong size in bytes") { + rows.map { row => + if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) + }.sum + } + } + } + + def testDecimalColumnStats[T <: AtomicType, U <: ColumnStats]( + initialStatistics: GenericInternalRow): Unit = { + + val columnStatsName = classOf[DecimalColumnStats].getSimpleName + val columnType = COMPACT_DECIMAL(15, 10) + + test(s"$columnStatsName: empty") { + val columnStats = new DecimalColumnStats(15, 10) + columnStats.collectedStatistics.values.zip(initialStatistics.values).foreach { + case (actual, expected) => assert(actual === expected) + } + } + + test(s"$columnStatsName: non-empty") { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + val columnStats = new DecimalColumnStats(15, 10) + val rows = Seq.fill(10)(makeRandomRow(columnType)) ++ Seq.fill(10)(makeNullRow(1)) + rows.foreach(columnStats.gatherStats(_, 0)) + + val values = rows.take(10).map(_.get(0, columnType.dataType).asInstanceOf[T#InternalType]) + val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#InternalType]] + val stats = columnStats.collectedStatistics + + assertResult(values.min(ordering), "Wrong lower bound")(stats.values(0)) + assertResult(values.max(ordering), "Wrong upper bound")(stats.values(1)) + assertResult(10, "Wrong null count")(stats.values(2)) + assertResult(20, "Wrong row count")(stats.values(3)) + assertResult(stats.values(4), "Wrong size in bytes") { + rows.map { row => + if (row.isNullAt(0)) 4 else columnType.actualSize(row, 0) + }.sum + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala new file mode 100644 index 0000000000..34dd96929e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnTypeSuite.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.{ByteOrder, ByteBuffer} + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types._ +import org.apache.spark.{Logging, SparkFunSuite} + + +class ColumnTypeSuite extends SparkFunSuite with Logging { + private val DEFAULT_BUFFER_SIZE = 512 + private val MAP_TYPE = MAP(MapType(IntegerType, StringType)) + private val ARRAY_TYPE = ARRAY(ArrayType(IntegerType)) + private val STRUCT_TYPE = STRUCT(StructType(StructField("a", StringType) :: Nil)) + + test("defaultSize") { + val checks = Map( + NULL-> 0, BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, LONG -> 8, + FLOAT -> 4, DOUBLE -> 8, COMPACT_DECIMAL(15, 10) -> 8, LARGE_DECIMAL(20, 10) -> 12, + STRING -> 8, BINARY -> 16, STRUCT_TYPE -> 20, ARRAY_TYPE -> 16, MAP_TYPE -> 32) + + checks.foreach { case (columnType, expectedSize) => + assertResult(expectedSize, s"Wrong defaultSize for $columnType") { + columnType.defaultSize + } + } + } + + test("actualSize") { + def checkActualSize( + columnType: ColumnType[_], + value: Any, + expected: Int): Unit = { + + assertResult(expected, s"Wrong actualSize for $columnType") { + val row = new GenericMutableRow(1) + row.update(0, CatalystTypeConverters.convertToCatalyst(value)) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + columnType.actualSize(proj(row), 0) + } + } + + checkActualSize(NULL, null, 0) + checkActualSize(BOOLEAN, true, 1) + checkActualSize(BYTE, Byte.MaxValue, 1) + checkActualSize(SHORT, Short.MaxValue, 2) + checkActualSize(INT, Int.MaxValue, 4) + checkActualSize(LONG, Long.MaxValue, 8) + checkActualSize(FLOAT, Float.MaxValue, 4) + checkActualSize(DOUBLE, Double.MaxValue, 8) + checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(BINARY, Array.fill[Byte](4)(0.toByte), 4 + 4) + checkActualSize(COMPACT_DECIMAL(15, 10), Decimal(0, 15, 10), 8) + checkActualSize(LARGE_DECIMAL(20, 10), Decimal(0, 20, 10), 5) + checkActualSize(ARRAY_TYPE, Array[Any](1), 16) + checkActualSize(MAP_TYPE, Map(1 -> "a"), 29) + checkActualSize(STRUCT_TYPE, Row("hello"), 28) + } + + testNativeColumnType(BOOLEAN) + testNativeColumnType(BYTE) + testNativeColumnType(SHORT) + testNativeColumnType(INT) + testNativeColumnType(LONG) + testNativeColumnType(FLOAT) + testNativeColumnType(DOUBLE) + testNativeColumnType(COMPACT_DECIMAL(15, 10)) + testNativeColumnType(STRING) + + testColumnType(NULL) + testColumnType(BINARY) + testColumnType(LARGE_DECIMAL(20, 10)) + testColumnType(STRUCT_TYPE) + testColumnType(ARRAY_TYPE) + testColumnType(MAP_TYPE) + + def testNativeColumnType[T <: AtomicType](columnType: NativeColumnType[T]): Unit = { + testColumnType[T#InternalType](columnType) + } + + def testColumnType[JvmType](columnType: ColumnType[JvmType]): Unit = { + + val buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE).order(ByteOrder.nativeOrder()) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) + val seq = (0 until 4).map(_ => proj(makeRandomRow(columnType)).copy()) + + test(s"$columnType append/extract") { + buffer.rewind() + seq.foreach(columnType.append(_, 0, buffer)) + + buffer.rewind() + seq.foreach { row => + logInfo("buffer = " + buffer + ", expected = " + row) + val expected = converter(row.get(0, columnType.dataType)) + val extracted = converter(columnType.extract(buffer)) + assert(expected === extracted, + s"Extracted value didn't equal to the original one. $expected != $extracted, buffer =" + + dumpBuffer(buffer.duplicate().rewind().asInstanceOf[ByteBuffer])) + } + } + } + + private def dumpBuffer(buff: ByteBuffer): Any = { + val sb = new StringBuilder() + while (buff.hasRemaining) { + val b = buff.get() + sb.append(Integer.toHexString(b & 0xff)).append(' ') + } + if (sb.nonEmpty) sb.setLength(sb.length - 1) + sb.toString() + } + + test("column type for decimal types with different precision") { + (1 to 18).foreach { i => + assertResult(COMPACT_DECIMAL(i, 0)) { + ColumnType(DecimalType(i, 0)) + } + } + + assertResult(LARGE_DECIMAL(19, 0)) { + ColumnType(DecimalType(19, 0)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala new file mode 100644 index 0000000000..9cae65ef6f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/ColumnarTestUtils.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import scala.collection.immutable.HashSet +import scala.util.Random + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow} +import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayBasedMapData} +import org.apache.spark.sql.types.{AtomicType, Decimal} +import org.apache.spark.unsafe.types.UTF8String + +object ColumnarTestUtils { + def makeNullRow(length: Int): GenericMutableRow = { + val row = new GenericMutableRow(length) + (0 until length).foreach(row.setNullAt) + row + } + + def makeRandomValue[JvmType](columnType: ColumnType[JvmType]): JvmType = { + def randomBytes(length: Int) = { + val bytes = new Array[Byte](length) + Random.nextBytes(bytes) + bytes + } + + (columnType match { + case NULL => null + case BOOLEAN => Random.nextBoolean() + case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte + case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort + case INT => Random.nextInt() + case LONG => Random.nextLong() + case FLOAT => Random.nextFloat() + case DOUBLE => Random.nextDouble() + case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32))) + case BINARY => randomBytes(Random.nextInt(32)) + case COMPACT_DECIMAL(precision, scale) => Decimal(Random.nextLong() % 100, precision, scale) + case LARGE_DECIMAL(precision, scale) => Decimal(Random.nextLong(), precision, scale) + case STRUCT(_) => + new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10)))) + case ARRAY(_) => + new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt())) + case MAP(_) => + ArrayBasedMapData( + Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32))))) + }).asInstanceOf[JvmType] + } + + def makeRandomValues( + head: ColumnType[_], + tail: ColumnType[_]*): Seq[Any] = makeRandomValues(Seq(head) ++ tail) + + def makeRandomValues(columnTypes: Seq[ColumnType[_]]): Seq[Any] = { + columnTypes.map(makeRandomValue(_)) + } + + def makeUniqueRandomValues[JvmType]( + columnType: ColumnType[JvmType], + count: Int): Seq[JvmType] = { + + Iterator.iterate(HashSet.empty[JvmType]) { set => + set + Iterator.continually(makeRandomValue(columnType)).filterNot(set.contains).next() + }.drop(count).next().toSeq + } + + def makeRandomRow( + head: ColumnType[_], + tail: ColumnType[_]*): InternalRow = makeRandomRow(Seq(head) ++ tail) + + def makeRandomRow(columnTypes: Seq[ColumnType[_]]): InternalRow = { + val row = new GenericMutableRow(columnTypes.length) + makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => + row(index) = value + } + row + } + + def makeUniqueValuesAndSingleValueRows[T <: AtomicType]( + columnType: NativeColumnType[T], + count: Int): (Seq[T#InternalType], Seq[GenericMutableRow]) = { + + val values = makeUniqueRandomValues(columnType, count) + val rows = values.map { value => + val row = new GenericMutableRow(1) + row(0) = value + row + } + + (values, rows) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala new file mode 100644 index 0000000000..25afed25c8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.SQLTestData._ +import org.apache.spark.sql.types._ +import org.apache.spark.storage.StorageLevel.MEMORY_ONLY + +class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + setupTestData() + + test("simple columnar query") { + val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().toSeq) + } + + test("default size avoids broadcast") { + // TODO: Improve this test when we have better statistics + sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)) + .toDF().registerTempTable("sizeTst") + sqlContext.cacheTable("sizeTst") + assert( + sqlContext.table("sizeTst").queryExecution.analyzed.statistics.sizeInBytes > + sqlContext.conf.autoBroadcastJoinThreshold) + } + + test("projection") { + val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().map { + case Row(key: Int, value: String) => value -> key + }.map(Row.fromTuple)) + } + + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { + val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan + val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None) + + checkAnswer(scan, testData.collect().toSeq) + checkAnswer(scan, testData.collect().toSeq) + } + + test("SPARK-1678 regression: compression must not lose repeated values") { + checkAnswer( + sql("SELECT * FROM repeatedData"), + repeatedData.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("repeatedData") + + checkAnswer( + sql("SELECT * FROM repeatedData"), + repeatedData.collect().toSeq.map(Row.fromTuple)) + } + + test("with null values") { + checkAnswer( + sql("SELECT * FROM nullableRepeatedData"), + nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("nullableRepeatedData") + + checkAnswer( + sql("SELECT * FROM nullableRepeatedData"), + nullableRepeatedData.collect().toSeq.map(Row.fromTuple)) + } + + test("SPARK-2729 regression: timestamp data type") { + val timestamps = (0 to 3).map(i => Tuple1(new Timestamp(i))).toDF("time") + timestamps.registerTempTable("timestamps") + + checkAnswer( + sql("SELECT time FROM timestamps"), + timestamps.collect().toSeq) + + sqlContext.cacheTable("timestamps") + + checkAnswer( + sql("SELECT time FROM timestamps"), + timestamps.collect().toSeq) + } + + test("SPARK-3320 regression: batched column buffer building should work with empty partitions") { + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq.map(Row.fromTuple)) + + sqlContext.cacheTable("withEmptyParts") + + checkAnswer( + sql("SELECT * FROM withEmptyParts"), + withEmptyParts.collect().toSeq.map(Row.fromTuple)) + } + + test("SPARK-4182 Caching complex types") { + complexData.cache().count() + // Shouldn't throw + complexData.count() + complexData.unpersist() + } + + test("decimal type") { + // Casting is required here because ScalaReflection can't capture decimal precision information. + val df = (1 to 10) + .map(i => Tuple1(Decimal(i, 15, 10))) + .toDF("dec") + .select($"dec" cast DecimalType(15, 10)) + + assert(df.schema.head.dataType === DecimalType(15, 10)) + + df.cache().registerTempTable("test_fixed_decimal") + checkAnswer( + sql("SELECT * FROM test_fixed_decimal"), + (1 to 10).map(i => Row(Decimal(i, 15, 10).toJavaBigDecimal))) + } + + test("test different data types") { + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + val dataTypes = + Seq(StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, true) + } + val allColumns = fields.map(_.name).mkString(",") + val schema = StructType(fields) + + // Create a RDD for the schema + val rdd = + sparkContext.parallelize((1 to 10000), 10).map { i => + Row( + s"str${i}: test cache.", + s"binary${i}: test cache.".getBytes("UTF-8"), + null, + i % 2 == 0, + i.toByte, + i.toShort, + i, + Long.MaxValue - i.toLong, + (i + 0.25).toFloat, + (i + 0.75), + BigDecimal(Long.MaxValue.toString + ".12345"), + new java.math.BigDecimal(s"${i % 9 + 1}" + ".23456"), + new Date(i), + new Timestamp(i * 1000000L), + (i to i + 10).toSeq, + (i to i + 10).map(j => s"map_key_$j" -> (Long.MaxValue - j)).toMap, + Row((i - 0.25).toFloat, Seq(true, false, null))) + } + sqlContext.createDataFrame(rdd, schema).registerTempTable("InMemoryCache_different_data_types") + // Cache the table. + sql("cache table InMemoryCache_different_data_types") + // Make sure the table is indeed cached. + sqlContext.table("InMemoryCache_different_data_types").queryExecution.executedPlan + assert( + sqlContext.isCached("InMemoryCache_different_data_types"), + "InMemoryCache_different_data_types should be cached.") + // Issue a query and check the results. + checkAnswer( + sql(s"SELECT DISTINCT ${allColumns} FROM InMemoryCache_different_data_types"), + sqlContext.table("InMemoryCache_different_data_types").collect()) + sqlContext.dropTempTable("InMemoryCache_different_data_types") + } + + test("SPARK-10422: String column in InMemoryColumnarCache needs to override clone method") { + val df = sqlContext.range(1, 100).selectExpr("id % 10 as id") + .rdd.map(id => Tuple1(s"str_$id")).toDF("i") + val cached = df.cache() + // count triggers the caching action. It should not throw. + cached.count() + + // Make sure, the DataFrame is indeed cached. + assert(sqlContext.cacheManager.lookupCachedData(cached).nonEmpty) + + // Check result. + checkAnswer( + cached, + sqlContext.range(1, 100).selectExpr("id % 10 as id") + .rdd.map(id => Tuple1(s"str_$id")).toDF("i") + ) + + // Drop the cache. + cached.unpersist() + } + + test("SPARK-10859: Predicates pushed to InMemoryColumnarTableScan are not evaluated correctly") { + val data = sqlContext.range(10).selectExpr("id", "cast(id as string) as s") + data.cache() + assert(data.count() === 10) + assert(data.filter($"s" === "3").count() === 1) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala new file mode 100644 index 0000000000..35dc9a276c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnAccessorSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.types._ + +class TestNullableColumnAccessor[JvmType]( + buffer: ByteBuffer, + columnType: ColumnType[JvmType]) + extends BasicColumnAccessor(buffer, columnType) + with NullableColumnAccessor + +object TestNullableColumnAccessor { + def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType]) + : TestNullableColumnAccessor[JvmType] = { + new TestNullableColumnAccessor(buffer, columnType) + } +} + +class NullableColumnAccessorSuite extends SparkFunSuite { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + Seq( + NULL, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, + STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), + STRUCT(StructType(StructField("a", StringType) :: Nil)), + ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) + .foreach { + testNullableColumnAccessor(_) + } + + def testNullableColumnAccessor[JvmType]( + columnType: ColumnType[JvmType]): Unit = { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + val nullRow = makeNullRow(1) + + test(s"Nullable $typeName column accessor: empty column") { + val builder = TestNullableColumnBuilder(columnType) + val accessor = TestNullableColumnAccessor(builder.build(), columnType) + assert(!accessor.hasNext) + } + + test(s"Nullable $typeName column accessor: access null values") { + val builder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + val proj = UnsafeProjection.create(Array[DataType](columnType.dataType)) + + (0 until 4).foreach { _ => + builder.appendFrom(proj(randomRow), 0) + builder.appendFrom(proj(nullRow), 0) + } + + val accessor = TestNullableColumnAccessor(builder.build(), columnType) + val row = new GenericMutableRow(1) + val converter = CatalystTypeConverters.createToScalaConverter(columnType.dataType) + + (0 until 4).foreach { _ => + assert(accessor.hasNext) + accessor.extractTo(row, 0) + assert(converter(row.get(0, columnType.dataType)) + === converter(randomRow.get(0, columnType.dataType))) + + assert(accessor.hasNext) + accessor.extractTo(row, 0) + assert(row.isNullAt(0)) + } + + assert(!accessor.hasNext) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala new file mode 100644 index 0000000000..93be3e16a5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/NullableColumnBuilderSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, GenericMutableRow} +import org.apache.spark.sql.types._ + +class TestNullableColumnBuilder[JvmType](columnType: ColumnType[JvmType]) + extends BasicColumnBuilder[JvmType](new NoopColumnStats, columnType) + with NullableColumnBuilder + +object TestNullableColumnBuilder { + def apply[JvmType](columnType: ColumnType[JvmType], initialSize: Int = 0) + : TestNullableColumnBuilder[JvmType] = { + val builder = new TestNullableColumnBuilder(columnType) + builder.initialize(initialSize) + builder + } +} + +class NullableColumnBuilderSuite extends SparkFunSuite { + import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ + + Seq( + BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, + STRING, BINARY, COMPACT_DECIMAL(15, 10), LARGE_DECIMAL(20, 10), + STRUCT(StructType(StructField("a", StringType) :: Nil)), + ARRAY(ArrayType(IntegerType)), MAP(MapType(IntegerType, StringType))) + .foreach { + testNullableColumnBuilder(_) + } + + def testNullableColumnBuilder[JvmType]( + columnType: ColumnType[JvmType]): Unit = { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + val dataType = columnType.dataType + val proj = UnsafeProjection.create(Array[DataType](dataType)) + val converter = CatalystTypeConverters.createToScalaConverter(dataType) + + test(s"$typeName column builder: empty column") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val buffer = columnBuilder.build() + + assertResult(0, "Wrong null count")(buffer.getInt()) + assert(!buffer.hasRemaining) + } + + test(s"$typeName column builder: buffer size auto growth") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + + (0 until 4).foreach { _ => + columnBuilder.appendFrom(proj(randomRow), 0) + } + + val buffer = columnBuilder.build() + + assertResult(0, "Wrong null count")(buffer.getInt()) + } + + test(s"$typeName column builder: null values") { + val columnBuilder = TestNullableColumnBuilder(columnType) + val randomRow = makeRandomRow(columnType) + val nullRow = makeNullRow(1) + + (0 until 4).foreach { _ => + columnBuilder.appendFrom(proj(randomRow), 0) + columnBuilder.appendFrom(proj(nullRow), 0) + } + + val buffer = columnBuilder.build() + + assertResult(4, "Wrong null count")(buffer.getInt()) + + // For null positions + (1 to 7 by 2).foreach(assertResult(_, "Wrong null position")(buffer.getInt())) + + // For non-null values + val actual = new GenericMutableRow(new Array[Any](1)) + (0 until 4).foreach { _ => + columnType.extract(buffer, actual, 0) + assert(converter(actual.get(0, dataType)) === converter(randomRow.get(0, dataType)), + "Extracted value didn't equal to the original one") + } + + assert(!buffer.hasRemaining) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala new file mode 100644 index 0000000000..d762f7bfe9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql._ +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.test.SQLTestData._ + +class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext { + import testImplicits._ + + private lazy val originalColumnBatchSize = sqlContext.conf.columnBatchSize + private lazy val originalInMemoryPartitionPruning = sqlContext.conf.inMemoryPartitionPruning + + override protected def beforeAll(): Unit = { + super.beforeAll() + // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch + sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, 10) + + val pruningData = sparkContext.makeRDD((1 to 100).map { key => + val string = if (((key - 1) / 10) % 2 == 0) null else key.toString + TestData(key, string) + }, 5).toDF() + pruningData.registerTempTable("pruningData") + + // Enable in-memory partition pruning + sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) + // Enable in-memory table scan accumulators + sqlContext.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true") + sqlContext.cacheTable("pruningData") + } + + override protected def afterAll(): Unit = { + try { + sqlContext.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) + sqlContext.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + sqlContext.uncacheTable("pruningData") + } finally { + super.afterAll() + } + } + + // Comparisons + checkBatchPruning("SELECT key FROM pruningData WHERE key = 1", 1, 1)(Seq(1)) + checkBatchPruning("SELECT key FROM pruningData WHERE 1 = key", 1, 1)(Seq(1)) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 12", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key <= 11", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key > 88", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE key >= 89", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE 12 > key", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE 11 >= key", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE 88 < key", 1, 2)(89 to 100) + checkBatchPruning("SELECT key FROM pruningData WHERE 89 <= key", 1, 2)(89 to 100) + + // IS NULL + checkBatchPruning("SELECT key FROM pruningData WHERE value IS NULL", 5, 5) { + (1 to 10) ++ (21 to 30) ++ (41 to 50) ++ (61 to 70) ++ (81 to 90) + } + + // IS NOT NULL + checkBatchPruning("SELECT key FROM pruningData WHERE value IS NOT NULL", 5, 5) { + (11 to 20) ++ (31 to 40) ++ (51 to 60) ++ (71 to 80) ++ (91 to 100) + } + + // Conjunction and disjunction + checkBatchPruning("SELECT key FROM pruningData WHERE key > 8 AND key <= 21", 2, 3)(9 to 21) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR key > 99", 2, 2)(Seq(1, 100)) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 12 AND key IS NOT NULL", 1, 2)(1 to 11) + checkBatchPruning("SELECT key FROM pruningData WHERE key < 2 OR (key > 78 AND key < 92)", 3, 4) { + Seq(1) ++ (79 to 91) + } + checkBatchPruning("SELECT key FROM pruningData WHERE NOT (key < 88)", 1, 2) { + // Although the `NOT` operator isn't supported directly, the optimizer can transform + // `NOT (a < b)` to `b >= a` + 88 to 100 + } + + // With unsupported predicate + { + val seq = (1 to 30).mkString(", ") + checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq))", 5, 10)(31 to 100) + checkBatchPruning(s"SELECT key FROM pruningData WHERE NOT (key IN ($seq)) AND key > 88", 1, 2) { + 89 to 100 + } + } + + def checkBatchPruning( + query: String, + expectedReadPartitions: Int, + expectedReadBatches: Int)( + expectedQueryResult: => Seq[Int]): Unit = { + + test(query) { + val df = sql(query) + val queryExecution = df.queryExecution + + assertResult(expectedQueryResult.toArray, s"Wrong query result: $queryExecution") { + df.collect().map(_(0)).toArray + } + + val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect { + case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) + }.head + + assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution") + assert( + readPartitions === expectedReadPartitions, + s"Wrong number of read partitions: $queryExecution") + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala new file mode 100644 index 0000000000..ccbddef0fa --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/BooleanBitSetSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.execution.columnar.{BOOLEAN, NoopColumnStats} + +class BooleanBitSetSuite extends SparkFunSuite { + import BooleanBitSet._ + + def skeleton(count: Int) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) + val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) + val values = rows.map(_.getBoolean(0)) + + rows.foreach(builder.appendFrom(_, 0)) + val buffer = builder.build() + + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + element count + bitset words + val compressedSize = 4 + 4 + { + val extra = if (count % BITS_PER_LONG == 0) 0 else 1 + (count / BITS_PER_LONG + extra) * 8 + } + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(BooleanBitSet.typeId, "Wrong compression scheme ID")(buffer.getInt()) + assertResult(count, "Wrong element count")(buffer.getInt()) + + var word = 0: Long + for (i <- 0 until count) { + val bit = i % BITS_PER_LONG + word = if (bit == 0) buffer.getLong() else word + assertResult(values(i), s"Wrong value in compressed buffer, index=$i") { + (word & ((1: Long) << bit)) != 0 + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = BooleanBitSet.decoder(buffer, BOOLEAN) + val mutableRow = new GenericMutableRow(1) + if (values.nonEmpty) { + values.foreach { + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + mutableRow.getBoolean(0) + } + } + } + assert(!decoder.hasNext) + } + + test(s"$BooleanBitSet: empty") { + skeleton(0) + } + + test(s"$BooleanBitSet: less than 1 word") { + skeleton(BITS_PER_LONG - 1) + } + + test(s"$BooleanBitSet: exactly 1 word") { + skeleton(BITS_PER_LONG) + } + + test(s"$BooleanBitSet: multiple whole words") { + skeleton(BITS_PER_LONG * 2) + } + + test(s"$BooleanBitSet: multiple words and 1 more bit") { + skeleton(BITS_PER_LONG * 2 + 1) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala new file mode 100644 index 0000000000..830ca0294e --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/DictionaryEncodingSuite.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.AtomicType + +class DictionaryEncodingSuite extends SparkFunSuite { + testDictionaryEncoding(new IntColumnStats, INT) + testDictionaryEncoding(new LongColumnStats, LONG) + testDictionaryEncoding(new StringColumnStats, STRING) + + def testDictionaryEncoding[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def buildDictionary(buffer: ByteBuffer) = { + (0 until buffer.getInt()).map(columnType.extract(buffer) -> _.toShort).toMap + } + + def stableDistinct(seq: Seq[Int]): Seq[Int] = if (seq.isEmpty) { + Seq.empty + } else { + seq.head +: seq.tail.filterNot(_ == seq.head) + } + + def skeleton(uniqueValueCount: Int, inputSeq: Seq[Int]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, DictionaryEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val dictValues = stableDistinct(inputSeq) + + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) + + if (dictValues.length > DictionaryEncoding.MAX_DICT_SIZE) { + withClue("Dictionary overflowed, compression should fail") { + intercept[Throwable] { + builder.build() + } + } + } else { + val buffer = builder.build() + val headerSize = CompressionScheme.columnHeaderSize(buffer) + // 4 extra bytes for dictionary size + val dictionarySize = 4 + rows.map(columnType.actualSize(_, 0)).sum + // 2 bytes for each `Short` + val compressedSize = 4 + dictionarySize + 2 * inputSeq.length + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(DictionaryEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + val dictionary = buildDictionary(buffer).toMap + + dictValues.foreach { i => + assertResult(i, "Wrong dictionary entry") { + dictionary(values(i)) + } + } + + inputSeq.foreach { i => + assertResult(i.toShort, "Wrong column element value")(buffer.getShort()) + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = DictionaryEncoding.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (inputSeq.nonEmpty) { + inputSeq.foreach { i => + assert(decoder.hasNext) + assertResult(values(i), "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + + assert(!decoder.hasNext) + } + } + + test(s"$DictionaryEncoding with $typeName: empty") { + skeleton(0, Seq.empty) + } + + test(s"$DictionaryEncoding with $typeName: simple case") { + skeleton(2, Seq(0, 1, 0, 1)) + } + + test(s"$DictionaryEncoding with $typeName: dictionary overflow") { + skeleton(DictionaryEncoding.MAX_DICT_SIZE + 1, 0 to DictionaryEncoding.MAX_DICT_SIZE) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala new file mode 100644 index 0000000000..988a577a7b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/IntegralDeltaSuite.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.IntegralType + +class IntegralDeltaSuite extends SparkFunSuite { + testIntegralDelta(new IntColumnStats, INT, IntDelta) + testIntegralDelta(new LongColumnStats, LONG, LongDelta) + + def testIntegralDelta[I <: IntegralType]( + columnStats: ColumnStats, + columnType: NativeColumnType[I], + scheme: CompressionScheme) { + + def skeleton(input: Seq[I#InternalType]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, scheme) + val deltas = if (input.isEmpty) { + Seq.empty[Long] + } else { + (input.tail, input.init).zipped.map { + case (x: Int, y: Int) => (x - y).toLong + case (x: Long, y: Long) => x - y + } + } + + input.map { value => + val row = new GenericMutableRow(1) + columnType.setField(row, 0, value) + builder.appendFrom(row, 0) + } + + val buffer = builder.build() + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + (if (deltas.isEmpty) { + 0 + } else { + val oneBoolean = columnType.defaultSize + 1 + oneBoolean + deltas.map { + d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean + }.sum + }) + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + buffer.position(headerSize) + assertResult(scheme.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + if (input.nonEmpty) { + assertResult(Byte.MinValue, "The first byte should be an escaping mark")(buffer.get()) + assertResult(input.head, "The first value is wrong")(columnType.extract(buffer)) + + (input.tail, deltas).zipped.foreach { (value, delta) => + if (math.abs(delta) <= Byte.MaxValue) { + assertResult(delta, "Wrong delta")(buffer.get()) + } else { + assertResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) + assertResult(value, "Wrong value")(columnType.extract(buffer)) + } + } + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = scheme.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (input.nonEmpty) { + input.foreach{ + assert(decoder.hasNext) + assertResult(_, "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + assert(!decoder.hasNext) + } + + test(s"$scheme: empty column") { + skeleton(Seq.empty) + } + + test(s"$scheme: simple case") { + val input = columnType match { + case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) + } + + skeleton(input.map(_.asInstanceOf[I#InternalType])) + } + + test(s"$scheme: long random series") { + // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[I#InternalType])) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala new file mode 100644 index 0000000000..ce3affba55 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/RunLengthEncodingSuite.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.GenericMutableRow +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.execution.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.types.AtomicType + +class RunLengthEncodingSuite extends SparkFunSuite { + testRunLengthEncoding(new NoopColumnStats, BOOLEAN) + testRunLengthEncoding(new ByteColumnStats, BYTE) + testRunLengthEncoding(new ShortColumnStats, SHORT) + testRunLengthEncoding(new IntColumnStats, INT) + testRunLengthEncoding(new LongColumnStats, LONG) + testRunLengthEncoding(new StringColumnStats, STRING) + + def testRunLengthEncoding[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T]) { + + val typeName = columnType.getClass.getSimpleName.stripSuffix("$") + + def skeleton(uniqueValueCount: Int, inputRuns: Seq[(Int, Int)]) { + // ------------- + // Tests encoder + // ------------- + + val builder = TestCompressibleColumnBuilder(columnStats, columnType, RunLengthEncoding) + val (values, rows) = makeUniqueValuesAndSingleValueRows(columnType, uniqueValueCount) + val inputSeq = inputRuns.flatMap { case (index, run) => + Seq.fill(run)(index) + } + + inputSeq.foreach(i => builder.appendFrom(rows(i), 0)) + val buffer = builder.build() + + // Column type ID + null count + null positions + val headerSize = CompressionScheme.columnHeaderSize(buffer) + + // Compression scheme ID + compressed contents + val compressedSize = 4 + inputRuns.map { case (index, _) => + // 4 extra bytes each run for run length + columnType.actualSize(rows(index), 0) + 4 + }.sum + + // 4 extra bytes for compression scheme type ID + assertResult(headerSize + compressedSize, "Wrong buffer capacity")(buffer.capacity) + + // Skips column header + buffer.position(headerSize) + assertResult(RunLengthEncoding.typeId, "Wrong compression scheme ID")(buffer.getInt()) + + inputRuns.foreach { case (index, run) => + assertResult(values(index), "Wrong column element value")(columnType.extract(buffer)) + assertResult(run, "Wrong run length")(buffer.getInt()) + } + + // ------------- + // Tests decoder + // ------------- + + // Rewinds, skips column header and 4 more bytes for compression scheme ID + buffer.rewind().position(headerSize + 4) + + val decoder = RunLengthEncoding.decoder(buffer, columnType) + val mutableRow = new GenericMutableRow(1) + + if (inputSeq.nonEmpty) { + inputSeq.foreach { i => + assert(decoder.hasNext) + assertResult(values(i), "Wrong decoded value") { + decoder.next(mutableRow, 0) + columnType.getField(mutableRow, 0) + } + } + } + + assert(!decoder.hasNext) + } + + test(s"$RunLengthEncoding with $typeName: empty column") { + skeleton(0, Seq.empty) + } + + test(s"$RunLengthEncoding with $typeName: simple case") { + skeleton(2, Seq(0 -> 2, 1 ->2)) + } + + test(s"$RunLengthEncoding with $typeName: run length == 1") { + skeleton(2, Seq(0 -> 1, 1 ->1)) + } + + test(s"$RunLengthEncoding with $typeName: single long run") { + skeleton(1, Seq(0 -> 1000)) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala new file mode 100644 index 0000000000..5e078f2513 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/TestCompressibleColumnBuilder.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar.compression + +import org.apache.spark.sql.execution.columnar._ +import org.apache.spark.sql.types.AtomicType + +class TestCompressibleColumnBuilder[T <: AtomicType]( + override val columnStats: ColumnStats, + override val columnType: NativeColumnType[T], + override val schemes: Seq[CompressionScheme]) + extends NativeColumnBuilder(columnStats, columnType) + with NullableColumnBuilder + with CompressibleColumnBuilder[T] { + + override protected def isWorthCompressing(encoder: Encoder[T]) = true +} + +object TestCompressibleColumnBuilder { + def apply[T <: AtomicType]( + columnStats: ColumnStats, + columnType: NativeColumnType[T], + scheme: CompressionScheme): TestCompressibleColumnBuilder[T] = { + + val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme)) + builder.initialize(0, "", useCompression = true) + builder + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 5c2fc7d82f..99478e82d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.columnar.InMemoryColumnarTableScan import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} -- cgit v1.2.3