diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-07-21 00:46:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-21 00:46:28 -0700 |
commit | cd273a238144a9a436219cd01250369586f5638b (patch) | |
tree | ef844cf62d65e0d02a0af940b211cea6244ab7eb /sql/core | |
parent | db56f2df1b8027171da1b8d2571d1f2ef1e103b6 (diff) | |
download | spark-cd273a238144a9a436219cd01250369586f5638b.tar.gz spark-cd273a238144a9a436219cd01250369586f5638b.tar.bz2 spark-cd273a238144a9a436219cd01250369586f5638b.zip |
[SPARK-2190][SQL] Specialized ColumnType for Timestamp
JIRA issue: [SPARK-2190](https://issues.apache.org/jira/browse/SPARK-2190)
Added specialized in-memory column type for `Timestamp`. Whitelisted all timestamp related Hive tests except `timestamp_udf`, which is timezone sensitive.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1440 from liancheng/timestamp-column-type and squashes the following commits:
e682175 [Cheng Lian] Enabled more timezone sensitive Hive tests.
53a358f [Cheng Lian] Fixed failed test suites
01b592d [Cheng Lian] Fixed SimpleDateFormat thread safety issue
2a59343 [Cheng Lian] Removed timezone sensitive Hive timestamp tests
45dd05d [Cheng Lian] Added Timestamp specific in-memory columnar representation
Diffstat (limited to 'sql/core')
7 files changed, 123 insertions, 64 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala index 3c39e1d350..42a5a9a84f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala @@ -90,6 +90,9 @@ private[sql] class FloatColumnAccessor(buffer: ByteBuffer) private[sql] class StringColumnAccessor(buffer: ByteBuffer) extends NativeColumnAccessor(buffer, STRING) +private[sql] class TimestampColumnAccessor(buffer: ByteBuffer) + extends NativeColumnAccessor(buffer, TIMESTAMP) + private[sql] class BinaryColumnAccessor(buffer: ByteBuffer) extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY) with NullableColumnAccessor @@ -105,16 +108,17 @@ private[sql] object ColumnAccessor { val columnTypeId = dup.getInt() columnTypeId match { - case INT.typeId => new IntColumnAccessor(dup) - case LONG.typeId => new LongColumnAccessor(dup) - case FLOAT.typeId => new FloatColumnAccessor(dup) - case DOUBLE.typeId => new DoubleColumnAccessor(dup) - case BOOLEAN.typeId => new BooleanColumnAccessor(dup) - case BYTE.typeId => new ByteColumnAccessor(dup) - case SHORT.typeId => new ShortColumnAccessor(dup) - case STRING.typeId => new StringColumnAccessor(dup) - case BINARY.typeId => new BinaryColumnAccessor(dup) - case GENERIC.typeId => new GenericColumnAccessor(dup) + case INT.typeId => new IntColumnAccessor(dup) + case LONG.typeId => new LongColumnAccessor(dup) + case FLOAT.typeId => new FloatColumnAccessor(dup) + case DOUBLE.typeId => new DoubleColumnAccessor(dup) + case BOOLEAN.typeId => new BooleanColumnAccessor(dup) + case BYTE.typeId => new ByteColumnAccessor(dup) + case SHORT.typeId => new ShortColumnAccessor(dup) + case STRING.typeId => new StringColumnAccessor(dup) + case TIMESTAMP.typeId => new TimestampColumnAccessor(dup) + case BINARY.typeId => new BinaryColumnAccessor(dup) + case GENERIC.typeId => new GenericColumnAccessor(dup) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 4be048cd74..74f5630fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -109,6 +109,9 @@ private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColum private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING) +private[sql] class TimestampColumnBuilder + extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP) + private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY) // TODO (lian) Add support for array, struct and map diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index 95602d321d..6502110e90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -344,21 +344,52 @@ private[sql] class StringColumnStats extends BasicColumnStats(STRING) { } override def contains(row: Row, ordinal: Int) = { - !(upperBound eq null) && { + (upperBound ne null) && { val field = columnType.getField(row, ordinal) lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0 } } override def isAbove(row: Row, ordinal: Int) = { - !(upperBound eq null) && { + (upperBound ne null) && { val field = columnType.getField(row, ordinal) field.compareTo(upperBound) < 0 } } override def isBelow(row: Row, ordinal: Int) = { - !(lowerBound eq null) && { + (lowerBound ne null) && { + val field = columnType.getField(row, ordinal) + lowerBound.compareTo(field) < 0 + } + } +} + +private[sql] class TimestampColumnStats extends BasicColumnStats(TIMESTAMP) { + override def initialBounds = (null, null) + + override def gatherStats(row: Row, ordinal: Int) { + val field = columnType.getField(row, ordinal) + if ((upperBound eq null) || field.compareTo(upperBound) > 0) _upper = field + if ((lowerBound eq null) || field.compareTo(lowerBound) < 0) _lower = field + } + + override def contains(row: Row, ordinal: Int) = { + (upperBound ne null) && { + val field = columnType.getField(row, ordinal) + lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0 + } + } + + override def isAbove(row: Row, ordinal: Int) = { + (lowerBound ne null) && { + val field = columnType.getField(row, ordinal) + field.compareTo(upperBound) < 0 + } + } + + override def isBelow(row: Row, ordinal: Int) = { + (lowerBound ne null) && { val field = columnType.getField(row, ordinal) lowerBound.compareTo(field) < 0 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala index 4cd52d8288..794bc60d0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala @@ -21,6 +21,8 @@ import java.nio.ByteBuffer import scala.reflect.runtime.universe.TypeTag +import java.sql.Timestamp + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.MutableRow import org.apache.spark.sql.catalyst.types._ @@ -221,6 +223,26 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) { override def getField(row: Row, ordinal: Int) = row.getString(ordinal) } +private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 8, 12) { + override def extract(buffer: ByteBuffer) = { + val timestamp = new Timestamp(buffer.getLong()) + timestamp.setNanos(buffer.getInt()) + timestamp + } + + override def append(v: Timestamp, buffer: ByteBuffer) { + buffer.putLong(v.getTime).putInt(v.getNanos) + } + + override def getField(row: Row, ordinal: Int) = { + row(ordinal).asInstanceOf[Timestamp] + } + + override def setField(row: MutableRow, ordinal: Int, value: Timestamp) { + row(ordinal) = value + } +} + private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( typeId: Int, defaultSize: Int) @@ -240,7 +262,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType]( } } -private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) { +private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](9, 16) { override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) { row(ordinal) = value } @@ -251,7 +273,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) { // Used to process generic objects (all types other than those listed above). Objects should be // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized // byte array. -private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) { +private[sql] object GENERIC extends ByteArrayColumnType[DataType](10, 16) { override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) { row(ordinal) = SparkSqlSerializer.deserialize[Any](value) } @@ -262,16 +284,17 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) { private[sql] object ColumnType { def apply(dataType: DataType): ColumnType[_, _] = { dataType match { - case IntegerType => INT - case LongType => LONG - case FloatType => FLOAT - case DoubleType => DOUBLE - case BooleanType => BOOLEAN - case ByteType => BYTE - case ShortType => SHORT - case StringType => STRING - case BinaryType => BINARY - case _ => GENERIC + case IntegerType => INT + case LongType => LONG + case FloatType => FLOAT + case DoubleType => DOUBLE + case BooleanType => BOOLEAN + case ByteType => BYTE + case ShortType => SHORT + case StringType => STRING + case BinaryType => BINARY + case TimestampType => TIMESTAMP + case _ => GENERIC } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala index 6f0d46d816..5f61fb5e16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala @@ -22,14 +22,15 @@ import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.types._ class ColumnStatsSuite extends FunSuite { - testColumnStats(classOf[BooleanColumnStats], BOOLEAN) - testColumnStats(classOf[ByteColumnStats], BYTE) - testColumnStats(classOf[ShortColumnStats], SHORT) - testColumnStats(classOf[IntColumnStats], INT) - testColumnStats(classOf[LongColumnStats], LONG) - testColumnStats(classOf[FloatColumnStats], FLOAT) - testColumnStats(classOf[DoubleColumnStats], DOUBLE) - testColumnStats(classOf[StringColumnStats], STRING) + testColumnStats(classOf[BooleanColumnStats], BOOLEAN) + testColumnStats(classOf[ByteColumnStats], BYTE) + testColumnStats(classOf[ShortColumnStats], SHORT) + testColumnStats(classOf[IntColumnStats], INT) + testColumnStats(classOf[LongColumnStats], LONG) + testColumnStats(classOf[FloatColumnStats], FLOAT) + testColumnStats(classOf[DoubleColumnStats], DOUBLE) + testColumnStats(classOf[StringColumnStats], STRING) + testColumnStats(classOf[TimestampColumnStats], TIMESTAMP) def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]]( columnStatsClass: Class[U], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala index 314b7d317e..829342215e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.columnar import java.nio.ByteBuffer +import java.sql.Timestamp import org.scalatest.FunSuite @@ -32,7 +33,7 @@ class ColumnTypeSuite extends FunSuite with Logging { test("defaultSize") { val checks = Map( INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4, - BOOLEAN -> 1, STRING -> 8, BINARY -> 16, GENERIC -> 16) + BOOLEAN -> 1, STRING -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16) checks.foreach { case (columnType, expectedSize) => assertResult(expectedSize, s"Wrong defaultSize for $columnType") { @@ -52,14 +53,15 @@ class ColumnTypeSuite extends FunSuite with Logging { } } - checkActualSize(INT, Int.MaxValue, 4) - checkActualSize(SHORT, Short.MaxValue, 2) - checkActualSize(LONG, Long.MaxValue, 8) - checkActualSize(BYTE, Byte.MaxValue, 1) - checkActualSize(DOUBLE, Double.MaxValue, 8) - checkActualSize(FLOAT, Float.MaxValue, 4) - checkActualSize(BOOLEAN, true, 1) - checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(INT, Int.MaxValue, 4) + checkActualSize(SHORT, Short.MaxValue, 2) + checkActualSize(LONG, Long.MaxValue, 8) + checkActualSize(BYTE, Byte.MaxValue, 1) + checkActualSize(DOUBLE, Double.MaxValue, 8) + checkActualSize(FLOAT, Float.MaxValue, 4) + checkActualSize(BOOLEAN, true, 1) + checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length) + checkActualSize(TIMESTAMP, new Timestamp(0L), 12) val binary = Array.fill[Byte](4)(0: Byte) checkActualSize(BINARY, binary, 4 + 4) @@ -188,17 +190,7 @@ class ColumnTypeSuite extends FunSuite with Logging { } private def hexDump(value: Any): String = { - if (value.isInstanceOf[String]) { - val sb = new StringBuilder() - for (ch <- value.asInstanceOf[String].toCharArray) { - sb.append(Integer.toHexString(ch & 0xffff)).append(' ') - } - if (! sb.isEmpty) sb.setLength(sb.length - 1) - sb.toString() - } else { - // for now .. - hexDump(value.toString) - } + value.toString.map(ch => Integer.toHexString(ch & 0xffff)).mkString(" ") } private def dumpBuffer(buff: ByteBuffer): Any = { @@ -207,7 +199,7 @@ class ColumnTypeSuite extends FunSuite with Logging { val b = buff.get() sb.append(Integer.toHexString(b & 0xff)).append(' ') } - if (! sb.isEmpty) sb.setLength(sb.length - 1) + if (sb.nonEmpty) sb.setLength(sb.length - 1) sb.toString() } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala index 04bdc43d95..38b04dd959 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.columnar import scala.collection.immutable.HashSet import scala.util.Random +import java.sql.Timestamp + import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.catalyst.types.{DataType, NativeType} @@ -39,15 +41,19 @@ object ColumnarTestUtils { } (columnType match { - case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte - case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort - case INT => Random.nextInt() - case LONG => Random.nextLong() - case FLOAT => Random.nextFloat() - case DOUBLE => Random.nextDouble() - case STRING => Random.nextString(Random.nextInt(32)) - case BOOLEAN => Random.nextBoolean() - case BINARY => randomBytes(Random.nextInt(32)) + case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte + case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort + case INT => Random.nextInt() + case LONG => Random.nextLong() + case FLOAT => Random.nextFloat() + case DOUBLE => Random.nextDouble() + case STRING => Random.nextString(Random.nextInt(32)) + case BOOLEAN => Random.nextBoolean() + case BINARY => randomBytes(Random.nextInt(32)) + case TIMESTAMP => + val timestamp = new Timestamp(Random.nextLong()) + timestamp.setNanos(Random.nextInt(999999999)) + timestamp case _ => // Using a random one-element map instead of an arbitrary object Map(Random.nextInt() -> Random.nextString(Random.nextInt(32))) @@ -96,5 +102,4 @@ object ColumnarTestUtils { (values, rows) } - } |