aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-07-21 00:46:28 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-21 00:46:28 -0700
commitcd273a238144a9a436219cd01250369586f5638b (patch)
treeef844cf62d65e0d02a0af940b211cea6244ab7eb /sql/core
parentdb56f2df1b8027171da1b8d2571d1f2ef1e103b6 (diff)
downloadspark-cd273a238144a9a436219cd01250369586f5638b.tar.gz
spark-cd273a238144a9a436219cd01250369586f5638b.tar.bz2
spark-cd273a238144a9a436219cd01250369586f5638b.zip
[SPARK-2190][SQL] Specialized ColumnType for Timestamp
JIRA issue: [SPARK-2190](https://issues.apache.org/jira/browse/SPARK-2190) Added specialized in-memory column type for `Timestamp`. Whitelisted all timestamp related Hive tests except `timestamp_udf`, which is timezone sensitive. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1440 from liancheng/timestamp-column-type and squashes the following commits: e682175 [Cheng Lian] Enabled more timezone sensitive Hive tests. 53a358f [Cheng Lian] Fixed failed test suites 01b592d [Cheng Lian] Fixed SimpleDateFormat thread safety issue 2a59343 [Cheng Lian] Removed timezone sensitive Hive timestamp tests 45dd05d [Cheng Lian] Added Timestamp specific in-memory columnar representation
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala34
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala25
7 files changed, 123 insertions, 64 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
index 3c39e1d350..42a5a9a84f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -90,6 +90,9 @@ private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
private[sql] class StringColumnAccessor(buffer: ByteBuffer)
extends NativeColumnAccessor(buffer, STRING)
+private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
+ extends NativeColumnAccessor(buffer, TIMESTAMP)
+
private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer, BINARY)
with NullableColumnAccessor
@@ -105,16 +108,17 @@ private[sql] object ColumnAccessor {
val columnTypeId = dup.getInt()
columnTypeId match {
- case INT.typeId => new IntColumnAccessor(dup)
- case LONG.typeId => new LongColumnAccessor(dup)
- case FLOAT.typeId => new FloatColumnAccessor(dup)
- case DOUBLE.typeId => new DoubleColumnAccessor(dup)
- case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
- case BYTE.typeId => new ByteColumnAccessor(dup)
- case SHORT.typeId => new ShortColumnAccessor(dup)
- case STRING.typeId => new StringColumnAccessor(dup)
- case BINARY.typeId => new BinaryColumnAccessor(dup)
- case GENERIC.typeId => new GenericColumnAccessor(dup)
+ case INT.typeId => new IntColumnAccessor(dup)
+ case LONG.typeId => new LongColumnAccessor(dup)
+ case FLOAT.typeId => new FloatColumnAccessor(dup)
+ case DOUBLE.typeId => new DoubleColumnAccessor(dup)
+ case BOOLEAN.typeId => new BooleanColumnAccessor(dup)
+ case BYTE.typeId => new ByteColumnAccessor(dup)
+ case SHORT.typeId => new ShortColumnAccessor(dup)
+ case STRING.typeId => new StringColumnAccessor(dup)
+ case TIMESTAMP.typeId => new TimestampColumnAccessor(dup)
+ case BINARY.typeId => new BinaryColumnAccessor(dup)
+ case GENERIC.typeId => new GenericColumnAccessor(dup)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 4be048cd74..74f5630fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -109,6 +109,9 @@ private[sql] class FloatColumnBuilder extends NativeColumnBuilder(new FloatColum
private[sql] class StringColumnBuilder extends NativeColumnBuilder(new StringColumnStats, STRING)
+private[sql] class TimestampColumnBuilder
+ extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
+
private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
// TODO (lian) Add support for array, struct and map
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 95602d321d..6502110e90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -344,21 +344,52 @@ private[sql] class StringColumnStats extends BasicColumnStats(STRING) {
}
override def contains(row: Row, ordinal: Int) = {
- !(upperBound eq null) && {
+ (upperBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
}
}
override def isAbove(row: Row, ordinal: Int) = {
- !(upperBound eq null) && {
+ (upperBound ne null) && {
val field = columnType.getField(row, ordinal)
field.compareTo(upperBound) < 0
}
}
override def isBelow(row: Row, ordinal: Int) = {
- !(lowerBound eq null) && {
+ (lowerBound ne null) && {
+ val field = columnType.getField(row, ordinal)
+ lowerBound.compareTo(field) < 0
+ }
+ }
+}
+
+private[sql] class TimestampColumnStats extends BasicColumnStats(TIMESTAMP) {
+ override def initialBounds = (null, null)
+
+ override def gatherStats(row: Row, ordinal: Int) {
+ val field = columnType.getField(row, ordinal)
+ if ((upperBound eq null) || field.compareTo(upperBound) > 0) _upper = field
+ if ((lowerBound eq null) || field.compareTo(lowerBound) < 0) _lower = field
+ }
+
+ override def contains(row: Row, ordinal: Int) = {
+ (upperBound ne null) && {
+ val field = columnType.getField(row, ordinal)
+ lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
+ }
+ }
+
+ override def isAbove(row: Row, ordinal: Int) = {
+ (lowerBound ne null) && {
+ val field = columnType.getField(row, ordinal)
+ field.compareTo(upperBound) < 0
+ }
+ }
+
+ override def isBelow(row: Row, ordinal: Int) = {
+ (lowerBound ne null) && {
val field = columnType.getField(row, ordinal)
lowerBound.compareTo(field) < 0
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index 4cd52d8288..794bc60d0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
import scala.reflect.runtime.universe.TypeTag
+import java.sql.Timestamp
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.catalyst.types._
@@ -221,6 +223,26 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
override def getField(row: Row, ordinal: Int) = row.getString(ordinal)
}
+private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 8, 12) {
+ override def extract(buffer: ByteBuffer) = {
+ val timestamp = new Timestamp(buffer.getLong())
+ timestamp.setNanos(buffer.getInt())
+ timestamp
+ }
+
+ override def append(v: Timestamp, buffer: ByteBuffer) {
+ buffer.putLong(v.getTime).putInt(v.getNanos)
+ }
+
+ override def getField(row: Row, ordinal: Int) = {
+ row(ordinal).asInstanceOf[Timestamp]
+ }
+
+ override def setField(row: MutableRow, ordinal: Int, value: Timestamp) {
+ row(ordinal) = value
+ }
+}
+
private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
typeId: Int,
defaultSize: Int)
@@ -240,7 +262,7 @@ private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
}
}
-private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
+private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](9, 16) {
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
row(ordinal) = value
}
@@ -251,7 +273,7 @@ private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16) {
// Used to process generic objects (all types other than those listed above). Objects should be
// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
// byte array.
-private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
+private[sql] object GENERIC extends ByteArrayColumnType[DataType](10, 16) {
override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]) {
row(ordinal) = SparkSqlSerializer.deserialize[Any](value)
}
@@ -262,16 +284,17 @@ private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16) {
private[sql] object ColumnType {
def apply(dataType: DataType): ColumnType[_, _] = {
dataType match {
- case IntegerType => INT
- case LongType => LONG
- case FloatType => FLOAT
- case DoubleType => DOUBLE
- case BooleanType => BOOLEAN
- case ByteType => BYTE
- case ShortType => SHORT
- case StringType => STRING
- case BinaryType => BINARY
- case _ => GENERIC
+ case IntegerType => INT
+ case LongType => LONG
+ case FloatType => FLOAT
+ case DoubleType => DOUBLE
+ case BooleanType => BOOLEAN
+ case ByteType => BYTE
+ case ShortType => SHORT
+ case StringType => STRING
+ case BinaryType => BINARY
+ case TimestampType => TIMESTAMP
+ case _ => GENERIC
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index 6f0d46d816..5f61fb5e16 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -22,14 +22,15 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.types._
class ColumnStatsSuite extends FunSuite {
- testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
- testColumnStats(classOf[ByteColumnStats], BYTE)
- testColumnStats(classOf[ShortColumnStats], SHORT)
- testColumnStats(classOf[IntColumnStats], INT)
- testColumnStats(classOf[LongColumnStats], LONG)
- testColumnStats(classOf[FloatColumnStats], FLOAT)
- testColumnStats(classOf[DoubleColumnStats], DOUBLE)
- testColumnStats(classOf[StringColumnStats], STRING)
+ testColumnStats(classOf[BooleanColumnStats], BOOLEAN)
+ testColumnStats(classOf[ByteColumnStats], BYTE)
+ testColumnStats(classOf[ShortColumnStats], SHORT)
+ testColumnStats(classOf[IntColumnStats], INT)
+ testColumnStats(classOf[LongColumnStats], LONG)
+ testColumnStats(classOf[FloatColumnStats], FLOAT)
+ testColumnStats(classOf[DoubleColumnStats], DOUBLE)
+ testColumnStats(classOf[StringColumnStats], STRING)
+ testColumnStats(classOf[TimestampColumnStats], TIMESTAMP)
def testColumnStats[T <: NativeType, U <: NativeColumnStats[T]](
columnStatsClass: Class[U],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 314b7d317e..829342215e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
+import java.sql.Timestamp
import org.scalatest.FunSuite
@@ -32,7 +33,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
test("defaultSize") {
val checks = Map(
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4,
- BOOLEAN -> 1, STRING -> 8, BINARY -> 16, GENERIC -> 16)
+ BOOLEAN -> 1, STRING -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
checks.foreach { case (columnType, expectedSize) =>
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
@@ -52,14 +53,15 @@ class ColumnTypeSuite extends FunSuite with Logging {
}
}
- checkActualSize(INT, Int.MaxValue, 4)
- checkActualSize(SHORT, Short.MaxValue, 2)
- checkActualSize(LONG, Long.MaxValue, 8)
- checkActualSize(BYTE, Byte.MaxValue, 1)
- checkActualSize(DOUBLE, Double.MaxValue, 8)
- checkActualSize(FLOAT, Float.MaxValue, 4)
- checkActualSize(BOOLEAN, true, 1)
- checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
+ checkActualSize(INT, Int.MaxValue, 4)
+ checkActualSize(SHORT, Short.MaxValue, 2)
+ checkActualSize(LONG, Long.MaxValue, 8)
+ checkActualSize(BYTE, Byte.MaxValue, 1)
+ checkActualSize(DOUBLE, Double.MaxValue, 8)
+ checkActualSize(FLOAT, Float.MaxValue, 4)
+ checkActualSize(BOOLEAN, true, 1)
+ checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
+ checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
val binary = Array.fill[Byte](4)(0: Byte)
checkActualSize(BINARY, binary, 4 + 4)
@@ -188,17 +190,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
}
private def hexDump(value: Any): String = {
- if (value.isInstanceOf[String]) {
- val sb = new StringBuilder()
- for (ch <- value.asInstanceOf[String].toCharArray) {
- sb.append(Integer.toHexString(ch & 0xffff)).append(' ')
- }
- if (! sb.isEmpty) sb.setLength(sb.length - 1)
- sb.toString()
- } else {
- // for now ..
- hexDump(value.toString)
- }
+ value.toString.map(ch => Integer.toHexString(ch & 0xffff)).mkString(" ")
}
private def dumpBuffer(buff: ByteBuffer): Any = {
@@ -207,7 +199,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
val b = buff.get()
sb.append(Integer.toHexString(b & 0xff)).append(' ')
}
- if (! sb.isEmpty) sb.setLength(sb.length - 1)
+ if (sb.nonEmpty) sb.setLength(sb.length - 1)
sb.toString()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 04bdc43d95..38b04dd959 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.columnar
import scala.collection.immutable.HashSet
import scala.util.Random
+import java.sql.Timestamp
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.{DataType, NativeType}
@@ -39,15 +41,19 @@ object ColumnarTestUtils {
}
(columnType match {
- case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
- case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
- case INT => Random.nextInt()
- case LONG => Random.nextLong()
- case FLOAT => Random.nextFloat()
- case DOUBLE => Random.nextDouble()
- case STRING => Random.nextString(Random.nextInt(32))
- case BOOLEAN => Random.nextBoolean()
- case BINARY => randomBytes(Random.nextInt(32))
+ case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
+ case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
+ case INT => Random.nextInt()
+ case LONG => Random.nextLong()
+ case FLOAT => Random.nextFloat()
+ case DOUBLE => Random.nextDouble()
+ case STRING => Random.nextString(Random.nextInt(32))
+ case BOOLEAN => Random.nextBoolean()
+ case BINARY => randomBytes(Random.nextInt(32))
+ case TIMESTAMP =>
+ val timestamp = new Timestamp(Random.nextLong())
+ timestamp.setNanos(Random.nextInt(999999999))
+ timestamp
case _ =>
// Using a random one-element map instead of an arbitrary object
Map(Random.nextInt() -> Random.nextString(Random.nextInt(32)))
@@ -96,5 +102,4 @@ object ColumnarTestUtils {
(values, rows)
}
-
}