aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-06 08:45:31 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-06 08:45:31 -0700
commit27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d (patch)
tree1e434854f6fc2056800db14bbf2a703450c2b710
parent4e0027feaee7c028741da88d8fbc26a45fc4a268 (diff)
downloadspark-27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d.tar.gz
spark-27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d.tar.bz2
spark-27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d.zip
[SPARK-10938] [SQL] remove typeId in columnar cache
This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead). Author: Davies Liu <davies@databricks.com> Closes #8989 from davies/refactor_cache.
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala71
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala5
13 files changed, 63 insertions, 151 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b2e6be7066..2d4d146f51 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -42,7 +42,9 @@ object MimaExcludes {
excludePackage("org.spark-project.jetty"),
MimaBuild.excludeSparkPackage("unused"),
// SQL execution is considered private.
- excludePackage("org.apache.spark.sql.execution")
+ excludePackage("org.apache.spark.sql.execution"),
+ // SQL columnar is considered private.
+ excludePackage("org.apache.spark.sql.columnar")
) ++
MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
index 4c29a09321..2b1d700987 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -103,35 +103,23 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer, dataType: DataType)
extends BasicColumnAccessor[Array[Byte]](buffer, GENERIC(dataType))
with NullableColumnAccessor
-private[sql] class DateColumnAccessor(buffer: ByteBuffer)
- extends NativeColumnAccessor(buffer, DATE)
-
-private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
- extends NativeColumnAccessor(buffer, TIMESTAMP)
-
private[sql] object ColumnAccessor {
def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
- val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
-
- // The first 4 bytes in the buffer indicate the column type. This field is not used now,
- // because we always know the data type of the column ahead of time.
- dup.getInt()
+ val buf = buffer.order(ByteOrder.nativeOrder)
dataType match {
- case BooleanType => new BooleanColumnAccessor(dup)
- case ByteType => new ByteColumnAccessor(dup)
- case ShortType => new ShortColumnAccessor(dup)
- case IntegerType => new IntColumnAccessor(dup)
- case DateType => new DateColumnAccessor(dup)
- case LongType => new LongColumnAccessor(dup)
- case TimestampType => new TimestampColumnAccessor(dup)
- case FloatType => new FloatColumnAccessor(dup)
- case DoubleType => new DoubleColumnAccessor(dup)
- case StringType => new StringColumnAccessor(dup)
- case BinaryType => new BinaryColumnAccessor(dup)
+ case BooleanType => new BooleanColumnAccessor(buf)
+ case ByteType => new ByteColumnAccessor(buf)
+ case ShortType => new ShortColumnAccessor(buf)
+ case IntegerType | DateType => new IntColumnAccessor(buf)
+ case LongType | TimestampType => new LongColumnAccessor(buf)
+ case FloatType => new FloatColumnAccessor(buf)
+ case DoubleType => new DoubleColumnAccessor(buf)
+ case StringType => new StringColumnAccessor(buf)
+ case BinaryType => new BinaryColumnAccessor(buf)
case DecimalType.Fixed(precision, scale) if precision < 19 =>
- new FixedDecimalColumnAccessor(dup, precision, scale)
- case other => new GenericColumnAccessor(dup, other)
+ new FixedDecimalColumnAccessor(buf, precision, scale)
+ case other => new GenericColumnAccessor(buf, other)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 1620fc401b..2e60564f7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -63,9 +63,8 @@ private[sql] class BasicColumnBuilder[JvmType](
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
- // Reserves 4 bytes for column type ID
- buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
- buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
+ buffer = ByteBuffer.allocate(size * columnType.defaultSize)
+ buffer.order(ByteOrder.nativeOrder())
}
override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
@@ -121,11 +120,6 @@ private[sql] class FixedDecimalColumnBuilder(
private[sql] class GenericColumnBuilder(dataType: DataType)
extends ComplexColumnBuilder(new GenericColumnStats(dataType), GENERIC(dataType))
-private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnStats, DATE)
-
-private[sql] class TimestampColumnBuilder
- extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
-
private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
@@ -154,10 +148,8 @@ private[sql] object ColumnBuilder {
case BooleanType => new BooleanColumnBuilder
case ByteType => new ByteColumnBuilder
case ShortType => new ShortColumnBuilder
- case IntegerType => new IntColumnBuilder
- case DateType => new DateColumnBuilder
- case LongType => new LongColumnBuilder
- case TimestampType => new TimestampColumnBuilder
+ case IntegerType | DateType => new IntColumnBuilder
+ case LongType | TimestampType => new LongColumnBuilder
case FloatType => new FloatColumnBuilder
case DoubleType => new DoubleColumnBuilder
case StringType => new StringColumnBuilder
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index fbd51b7c34..3b5052b754 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -266,7 +266,3 @@ private[sql] class GenericColumnStats(dataType: DataType) extends ColumnStats {
override def collectedStatistics: GenericInternalRow =
new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
}
-
-private[sql] class DateColumnStats extends IntColumnStats
-
-private[sql] class TimestampColumnStats extends LongColumnStats
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index ab482a3636..3a0cea8750 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -38,9 +38,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
// The catalyst data type of this column.
def dataType: DataType
- // A unique ID representing the type.
- def typeId: Int
-
// Default size in bytes for one element of type T (e.g. 4 for `Int`).
def defaultSize: Int
@@ -107,7 +104,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
private[sql] abstract class NativeColumnType[T <: AtomicType](
val dataType: T,
- val typeId: Int,
val defaultSize: Int)
extends ColumnType[T#InternalType] {
@@ -117,7 +113,7 @@ private[sql] abstract class NativeColumnType[T <: AtomicType](
def scalaTag: TypeTag[dataType.InternalType] = dataType.tag
}
-private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
+private[sql] object INT extends NativeColumnType(IntegerType, 4) {
override def append(v: Int, buffer: ByteBuffer): Unit = {
buffer.putInt(v)
}
@@ -145,7 +141,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
}
}
-private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
+private[sql] object LONG extends NativeColumnType(LongType, 8) {
override def append(v: Long, buffer: ByteBuffer): Unit = {
buffer.putLong(v)
}
@@ -173,7 +169,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
}
}
-private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
+private[sql] object FLOAT extends NativeColumnType(FloatType, 4) {
override def append(v: Float, buffer: ByteBuffer): Unit = {
buffer.putFloat(v)
}
@@ -201,7 +197,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
}
}
-private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
+private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) {
override def append(v: Double, buffer: ByteBuffer): Unit = {
buffer.putDouble(v)
}
@@ -229,7 +225,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
}
}
-private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
+private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
override def append(v: Boolean, buffer: ByteBuffer): Unit = {
buffer.put(if (v) 1: Byte else 0: Byte)
}
@@ -255,7 +251,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
}
}
-private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
+private[sql] object BYTE extends NativeColumnType(ByteType, 1) {
override def append(v: Byte, buffer: ByteBuffer): Unit = {
buffer.put(v)
}
@@ -283,7 +279,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
}
}
-private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
+private[sql] object SHORT extends NativeColumnType(ShortType, 2) {
override def append(v: Short, buffer: ByteBuffer): Unit = {
buffer.putShort(v)
}
@@ -311,7 +307,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
}
}
-private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
+private[sql] object STRING extends NativeColumnType(StringType, 8) {
override def actualSize(row: InternalRow, ordinal: Int): Int = {
row.getUTF8String(ordinal).numBytes() + 4
}
@@ -343,46 +339,9 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
override def clone(v: UTF8String): UTF8String = v.clone()
}
-private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
- override def extract(buffer: ByteBuffer): Int = {
- buffer.getInt
- }
-
- override def append(v: Int, buffer: ByteBuffer): Unit = {
- buffer.putInt(v)
- }
-
- override def getField(row: InternalRow, ordinal: Int): Int = {
- row.getInt(ordinal)
- }
-
- def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
- row(ordinal) = value
- }
-}
-
-private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
- override def extract(buffer: ByteBuffer): Long = {
- buffer.getLong
- }
-
- override def append(v: Long, buffer: ByteBuffer): Unit = {
- buffer.putLong(v)
- }
-
- override def getField(row: InternalRow, ordinal: Int): Long = {
- row.getLong(ordinal)
- }
-
- override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
- row(ordinal) = value
- }
-}
-
private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int)
extends NativeColumnType(
DecimalType(precision, scale),
- 10,
FIXED_DECIMAL.defaultSize) {
override def extract(buffer: ByteBuffer): Decimal = {
@@ -410,9 +369,7 @@ private[sql] object FIXED_DECIMAL {
val defaultSize = 8
}
-private[sql] sealed abstract class ByteArrayColumnType(
- val typeId: Int,
- val defaultSize: Int)
+private[sql] sealed abstract class ByteArrayColumnType(val defaultSize: Int)
extends ColumnType[Array[Byte]] {
override def actualSize(row: InternalRow, ordinal: Int): Int = {
@@ -431,7 +388,7 @@ private[sql] sealed abstract class ByteArrayColumnType(
}
}
-private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
+private[sql] object BINARY extends ByteArrayColumnType(16) {
def dataType: DataType = BooleanType
@@ -447,7 +404,7 @@ private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
// Used to process generic objects (all types other than those listed above). Objects should be
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
// byte array.
-private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(12, 16) {
+private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(16) {
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
row.update(ordinal, SparkSqlSerializer.deserialize[Any](value))
}
@@ -463,10 +420,8 @@ private[sql] object ColumnType {
case BooleanType => BOOLEAN
case ByteType => BYTE
case ShortType => SHORT
- case IntegerType => INT
- case DateType => DATE
- case LongType => LONG
- case TimestampType => TIMESTAMP
+ case IntegerType | DateType => INT
+ case LongType | TimestampType => LONG
case FloatType => FLOAT
case DoubleType => DOUBLE
case StringType => STRING
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index ba47bc783f..76cfddf1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -25,14 +25,13 @@ import org.apache.spark.sql.catalyst.InternalRow
* A stackable trait used for building byte buffer for a column containing null values. Memory
* layout of the final byte buffer is:
* {{{
- * .----------------------- Column type ID (4 bytes)
- * | .------------------- Null count N (4 bytes)
- * | | .--------------- Null positions (4 x N bytes, empty if null count is zero)
- * | | | .--------- Non-null elements
- * V V V V
- * +---+---+-----+---------+
- * | | | ... | ... ... |
- * +---+---+-----+---------+
+ * .------------------- Null count N (4 bytes)
+ * | .--------------- Null positions (4 x N bytes, empty if null count is zero)
+ * | | .--------- Non-null elements
+ * V V V
+ * +---+-----+---------+
+ * | | ... | ... ... |
+ * +---+-----+---------+
* }}}
*/
private[sql] trait NullableColumnBuilder extends ColumnBuilder {
@@ -66,16 +65,14 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
abstract override def build(): ByteBuffer = {
val nonNulls = super.build()
- val typeId = nonNulls.getInt()
val nullDataLen = nulls.position()
nulls.limit(nullDataLen)
nulls.rewind()
val buffer = ByteBuffer
- .allocate(4 + 4 + nullDataLen + nonNulls.remaining())
+ .allocate(4 + nullDataLen + nonNulls.remaining())
.order(ByteOrder.nativeOrder())
- .putInt(typeId)
.putInt(nullCount)
.put(nulls)
.put(nonNulls)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 39b21ddb47..161021ff96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -28,17 +28,16 @@ import org.apache.spark.sql.types.AtomicType
* A stackable trait that builds optionally compressed byte buffer for a column. Memory layout of
* the final byte buffer is:
* {{{
- * .--------------------------- Column type ID (4 bytes)
- * | .----------------------- Null count N (4 bytes)
- * | | .------------------- Null positions (4 x N bytes, empty if null count is zero)
- * | | | .------------- Compression scheme ID (4 bytes)
- * | | | | .--------- Compressed non-null elements
- * V V V V V
- * +---+---+-----+---+---------+
- * | | | ... | | ... ... |
- * +---+---+-----+---+---------+
- * \-----------/ \-----------/
- * header body
+ * .----------------------- Null count N (4 bytes)
+ * | .------------------- Null positions (4 x N bytes, empty if null count is zero)
+ * | | .------------- Compression scheme ID (4 bytes)
+ * | | | .--------- Compressed non-null elements
+ * V V V V
+ * +---+-----+---+---------+
+ * | | ... | | ... ... |
+ * +---+-----+---+---------+
+ * \-------/ \-----------/
+ * header body
* }}}
*/
private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
@@ -83,14 +82,13 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
override def build(): ByteBuffer = {
val nonNullBuffer = buildNonNulls()
- val typeId = nonNullBuffer.getInt()
val encoder: Encoder[T] = {
val candidate = compressionEncoders.minBy(_.compressionRatio)
if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType)
}
- // Header = column type ID + null count + null positions
- val headerSize = 4 + 4 + nulls.limit()
+ // Header = null count + null positions
+ val headerSize = 4 + nulls.limit()
val compressedSize = if (encoder.compressedSize == 0) {
nonNullBuffer.remaining()
} else {
@@ -102,7 +100,6 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
.allocate(headerSize + 4 + compressedSize)
.order(ByteOrder.nativeOrder)
// Write the header
- .putInt(typeId)
.putInt(nullCount)
.put(nulls)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index b1ef9b2ef7..9322b772fd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -74,8 +74,8 @@ private[sql] object CompressionScheme {
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
- val nullCount = header.getInt(4)
- // Column type ID + null count + null positions
- 4 + 4 + 4 * nullCount
+ val nullCount = header.getInt()
+ // null count + null positions
+ 4 + 4 * nullCount
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index d0430d2a60..708fb4cf79 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -27,10 +27,7 @@ class ColumnStatsSuite extends SparkFunSuite {
testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0))
testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0))
testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0))
- testColumnStats(classOf[DateColumnStats], DATE, createRow(Int.MaxValue, Int.MinValue, 0))
testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0))
- testColumnStats(classOf[TimestampColumnStats], TIMESTAMP,
- createRow(Long.MaxValue, Long.MinValue, 0))
testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0))
testColumnStats(classOf[DoubleColumnStats], DOUBLE,
createRow(Double.MaxValue, Double.MinValue, 0))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 8f024690ef..a4cbe3525e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -37,8 +37,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
test("defaultSize") {
val checks = Map(
- BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, DATE -> 4,
- LONG -> 8, TIMESTAMP -> 8, FLOAT -> 4, DOUBLE -> 8,
+ BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4,
+ LONG -> 8, FLOAT -> 4, DOUBLE -> 8,
STRING -> 8, BINARY -> 16, FIXED_DECIMAL(15, 10) -> 8,
MAP_GENERIC -> 16)
@@ -66,9 +66,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
checkActualSize(BYTE, Byte.MaxValue, 1)
checkActualSize(SHORT, Short.MaxValue, 2)
checkActualSize(INT, Int.MaxValue, 4)
- checkActualSize(DATE, Int.MaxValue, 4)
checkActualSize(LONG, Long.MaxValue, 8)
- checkActualSize(TIMESTAMP, Long.MaxValue, 8)
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(DOUBLE, Double.MaxValue, 8)
checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length)
@@ -93,12 +91,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
testNativeColumnType(INT)(_.putInt(_), _.getInt)
- testNativeColumnType(DATE)(_.putInt(_), _.getInt)
-
testNativeColumnType(LONG)(_.putLong(_), _.getLong)
- testNativeColumnType(TIMESTAMP)(_.putLong(_), _.getLong)
-
testNativeColumnType(FLOAT)(_.putFloat(_), _.getFloat)
testNativeColumnType(DOUBLE)(_.putDouble(_), _.getDouble)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 79bb7d072f..123a7053c0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.columnar
import scala.collection.immutable.HashSet
import scala.util.Random
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
+import org.apache.spark.sql.types.{AtomicType, Decimal}
import org.apache.spark.unsafe.types.UTF8String
object ColumnarTestUtils {
@@ -43,9 +44,7 @@ object ColumnarTestUtils {
case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
case INT => Random.nextInt()
- case DATE => Random.nextInt()
case LONG => Random.nextLong()
- case TIMESTAMP => Random.nextLong()
case FLOAT => Random.nextFloat()
case DOUBLE => Random.nextDouble()
case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index f4f6c7649b..a3a23d37d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{StringType, ArrayType, DataType}
+import org.apache.spark.sql.types.{ArrayType, StringType}
class TestNullableColumnAccessor[JvmType](
buffer: ByteBuffer,
@@ -32,17 +32,15 @@ class TestNullableColumnAccessor[JvmType](
object TestNullableColumnAccessor {
def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType])
: TestNullableColumnAccessor[JvmType] = {
- // Skips the column type ID
- buffer.getInt()
new TestNullableColumnAccessor(buffer, columnType)
}
}
class NullableColumnAccessorSuite extends SparkFunSuite {
- import ColumnarTestUtils._
+ import org.apache.spark.sql.columnar.ColumnarTestUtils._
Seq(
- BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE,
+ BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType)))
.foreach {
testNullableColumnAccessor(_)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
index 241d09ea20..9557eead27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -38,7 +38,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
import ColumnarTestUtils._
Seq(
- BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE,
+ BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType)))
.foreach {
testNullableColumnBuilder(_)
@@ -53,7 +53,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
val columnBuilder = TestNullableColumnBuilder(columnType)
val buffer = columnBuilder.build()
- assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
assertResult(0, "Wrong null count")(buffer.getInt())
assert(!buffer.hasRemaining)
}
@@ -68,7 +67,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
val buffer = columnBuilder.build()
- assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
assertResult(0, "Wrong null count")(buffer.getInt())
}
@@ -84,7 +82,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
val buffer = columnBuilder.build()
- assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
assertResult(4, "Wrong null count")(buffer.getInt())
// For null positions