diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-07-21 00:46:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-21 00:46:28 -0700 |
commit | cd273a238144a9a436219cd01250369586f5638b (patch) | |
tree | ef844cf62d65e0d02a0af940b211cea6244ab7eb /sql/catalyst | |
parent | db56f2df1b8027171da1b8d2571d1f2ef1e103b6 (diff) | |
download | spark-cd273a238144a9a436219cd01250369586f5638b.tar.gz spark-cd273a238144a9a436219cd01250369586f5638b.tar.bz2 spark-cd273a238144a9a436219cd01250369586f5638b.zip |
[SPARK-2190][SQL] Specialized ColumnType for Timestamp
JIRA issue: [SPARK-2190](https://issues.apache.org/jira/browse/SPARK-2190)
Added specialized in-memory column type for `Timestamp`. Whitelisted all timestamp related Hive tests except `timestamp_udf`, which is timezone sensitive.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1440 from liancheng/timestamp-column-type and squashes the following commits:
e682175 [Cheng Lian] Enabled more timezone sensitive Hive tests.
53a358f [Cheng Lian] Fixed failed test suites
01b592d [Cheng Lian] Fixed SimpleDateFormat thread safety issue
2a59343 [Cheng Lian] Removed timezone sensitive Hive timestamp tests
45dd05d [Cheng Lian] Added Timestamp specific in-memory columnar representation
Diffstat (limited to 'sql/catalyst')
2 files changed, 34 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 1f9716e385..0ad2b30cf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp +import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.sql.catalyst.types._ @@ -41,6 +42,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { // UDFToString private[this] def castToString: Any => Any = child.dataType match { case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8")) + case TimestampType => buildCast[Timestamp](_, timestampToString) case _ => buildCast[Any](_, _.toString) } @@ -126,6 +128,18 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { ts.getTime / 1000 + ts.getNanos.toDouble / 1000000000 } + // Converts Timestamp to string according to Hive TimestampWritable convention + private[this] def timestampToString(ts: Timestamp): String = { + val timestampString = ts.toString + val formatted = Cast.threadLocalDateFormat.get.format(ts) + + if (timestampString.length > 19 && timestampString.substring(19) != ".0") { + formatted + timestampString.substring(19) + } else { + formatted + } + } + private[this] def castToLong: Any => Any = child.dataType match { case StringType => buildCast[String](_, s => try s.toLong catch { @@ -249,3 +263,12 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { if (evaluated == null) null else cast(evaluated) } } + +object Cast { + // `SimpleDateFormat` is not thread-safe. + private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] { + override def initialValue() = { + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + } + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 73f546455b..db1ae29d40 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -158,7 +158,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation("abc" like regEx, true, new GenericRow(Array[Any]("a%"))) checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("b%"))) checkEvaluation("abc" like regEx, false, new GenericRow(Array[Any]("bc%"))) - + checkEvaluation(Literal(null, StringType) like regEx, null, new GenericRow(Array[Any]("bc%"))) } @@ -203,7 +203,7 @@ class ExpressionEvaluationSuite extends FunSuite { test("data type casting") { - val sts = "1970-01-01 00:00:01.0" + val sts = "1970-01-01 00:00:01.1" val ts = Timestamp.valueOf(sts) checkEvaluation("abdef" cast StringType, "abdef") @@ -293,7 +293,7 @@ class ExpressionEvaluationSuite extends FunSuite { // A test for higher precision than millis checkEvaluation(Cast(Cast(0.00000001, TimestampType), DoubleType), 0.00000001) } - + test("null checking") { val row = new GenericRow(Array[Any]("^Ba*n", null, true, null)) val c1 = 'a.string.at(0) @@ -312,7 +312,7 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(IsNull(Literal(null, ShortType)), true) checkEvaluation(IsNotNull(Literal(null, ShortType)), false) - + checkEvaluation(Coalesce(c1 :: c2 :: Nil), "^Ba*n", row) checkEvaluation(Coalesce(Literal(null, StringType) :: Nil), null, row) checkEvaluation(Coalesce(Literal(null, StringType) :: c1 :: c2 :: Nil), "^Ba*n", row) @@ -323,11 +323,11 @@ class ExpressionEvaluationSuite extends FunSuite { checkEvaluation(If(Literal(null, BooleanType), c2, c1), "^Ba*n", row) checkEvaluation(If(Literal(true, BooleanType), c1, c2), "^Ba*n", row) checkEvaluation(If(Literal(false, BooleanType), c2, c1), "^Ba*n", row) - checkEvaluation(If(Literal(false, BooleanType), + checkEvaluation(If(Literal(false, BooleanType), Literal("a", StringType), Literal("b", StringType)), "b", row) checkEvaluation(In(c1, c1 :: c2 :: Nil), true, row) - checkEvaluation(In(Literal("^Ba*n", StringType), + checkEvaluation(In(Literal("^Ba*n", StringType), Literal("^Ba*n", StringType) :: Nil), true, row) checkEvaluation(In(Literal("^Ba*n", StringType), Literal("^Ba*n", StringType) :: c2 :: Nil), true, row) @@ -378,7 +378,7 @@ class ExpressionEvaluationSuite extends FunSuite { test("complex type") { val row = new GenericRow(Array[Any]( - "^Ba*n", // 0 + "^Ba*n", // 0 null.asInstanceOf[String], // 1 new GenericRow(Array[Any]("aa", "bb")), // 2 Map("aa"->"bb"), // 3 @@ -391,18 +391,18 @@ class ExpressionEvaluationSuite extends FunSuite { val typeMap = MapType(StringType, StringType) val typeArray = ArrayType(StringType) - checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()), + checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()), Literal("aa")), "bb", row) checkEvaluation(GetItem(Literal(null, typeMap), Literal("aa")), null, row) checkEvaluation(GetItem(Literal(null, typeMap), Literal(null, StringType)), null, row) - checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()), + checkEvaluation(GetItem(BoundReference(3, AttributeReference("c", typeMap)()), Literal(null, StringType)), null, row) - checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()), + checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()), Literal(1)), "bb", row) checkEvaluation(GetItem(Literal(null, typeArray), Literal(1)), null, row) checkEvaluation(GetItem(Literal(null, typeArray), Literal(null, IntegerType)), null, row) - checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()), + checkEvaluation(GetItem(BoundReference(4, AttributeReference("c", typeArray)()), Literal(null, IntegerType)), null, row) checkEvaluation(GetField(BoundReference(2, AttributeReference("c", typeS)()), "a"), "aa", row) |