aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDaoyuan Wang <daoyuan.wang@intel.com>2015-02-03 12:21:45 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-03 12:21:45 -0800
commitdb821ed2ededf6ce79b838c77a9c10bed2ce555a (patch)
treed34b200923ca35a67163d179f16521ff03865648 /sql/core
parent5adbb39482631998dbfe4a1da88f6e75b30fb5ac (diff)
downloadspark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.tar.gz
spark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.tar.bz2
spark-db821ed2ededf6ce79b838c77a9c10bed2ce555a.zip
[SPARK-4508] [SQL] build native date type to conform behavior to Hive
The previous #3732 is reverted due to some test failure. Have fixed that. Author: Daoyuan Wang <daoyuan.wang@intel.com> Closes #4325 from adrian-wang/datenative and squashes the following commits: 096e20d [Daoyuan Wang] fix for mixed timezone 0ed0fdc [Daoyuan Wang] fix test data a2fdd4e [Daoyuan Wang] getDate c37832b [Daoyuan Wang] row to catalyst f0005b1 [Daoyuan Wang] add date in sql parser and java type conversion 024c9a6 [Daoyuan Wang] clean some import order d6715fc [Daoyuan Wang] refactoring Date as Primitive Int internally 374abd5 [Daoyuan Wang] spark native date type support
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala7
10 files changed, 48 insertions, 51 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index 391b3dae5c..cad0667b46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.columnar
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, Attribute, AttributeReference}
@@ -215,22 +215,7 @@ private[sql] class StringColumnStats extends ColumnStats {
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}
-private[sql] class DateColumnStats extends ColumnStats {
- protected var upper: Date = null
- protected var lower: Date = null
-
- override def gatherStats(row: Row, ordinal: Int) {
- super.gatherStats(row, ordinal)
- if (!row.isNullAt(ordinal)) {
- val value = row(ordinal).asInstanceOf[Date]
- if (upper == null || value.compareTo(upper) > 0) upper = value
- if (lower == null || value.compareTo(lower) < 0) lower = value
- sizeInBytes += DATE.defaultSize
- }
- }
-
- def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
-}
+private[sql] class DateColumnStats extends IntColumnStats
private[sql] class TimestampColumnStats extends ColumnStats {
protected var upper: Timestamp = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index fcf2faa091..db5bc0de36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -335,21 +335,20 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
}
}
-private[sql] object DATE extends NativeColumnType(DateType, 8, 8) {
+private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
override def extract(buffer: ByteBuffer) = {
- val date = new Date(buffer.getLong())
- date
+ buffer.getInt
}
- override def append(v: Date, buffer: ByteBuffer): Unit = {
- buffer.putLong(v.getTime)
+ override def append(v: Int, buffer: ByteBuffer): Unit = {
+ buffer.putInt(v)
}
override def getField(row: Row, ordinal: Int) = {
- row(ordinal).asInstanceOf[Date]
+ row(ordinal).asInstanceOf[Int]
}
- override def setField(row: MutableRow, ordinal: Int, value: Date): Unit = {
+ def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
row(ordinal) = value
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index b85021acc9..3a2f8d75da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -135,6 +135,8 @@ object EvaluatePython {
case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)
+ case (date: Int, DateType) => DateUtils.toJavaDate(date)
+
// Pyrolite can handle Timestamp and Decimal
case (other, _) => other
}
@@ -171,7 +173,7 @@ object EvaluatePython {
}): Row
case (c: java.util.Calendar, DateType) =>
- new java.sql.Date(c.getTime().getTime())
+ DateUtils.fromJavaDate(new java.sql.Date(c.getTime().getTime()))
case (c: java.util.Calendar, TimestampType) =>
new java.sql.Timestamp(c.getTime().getTime())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 9171939f7e..33ce71b51b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -377,10 +377,12 @@ private[sql] object JsonRDD extends Logging {
}
}
- private def toDate(value: Any): Date = {
+ private def toDate(value: Any): Int = {
value match {
// only support string as date
- case value: java.lang.String => new Date(DataTypeConversions.stringToTime(value).getTime)
+ case value: java.lang.String =>
+ DateUtils.millisToDays(DataTypeConversions.stringToTime(value).getTime)
+ case value: java.sql.Date => DateUtils.fromJavaDate(value)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e18ba287e8..0501b47f08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -38,7 +38,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
var origZone: TimeZone = _
override protected def beforeAll() {
origZone = TimeZone.getDefault
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+ TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
}
override protected def afterAll() {
@@ -143,26 +143,26 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
test("SPARK-3173 Timestamp support in the parser") {
checkAnswer(sql(
- "SELECT time FROM timestamps WHERE time=CAST('1970-01-01 00:00:00.001' AS TIMESTAMP)"),
- Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))
+ "SELECT time FROM timestamps WHERE time=CAST('1969-12-31 16:00:00.001' AS TIMESTAMP)"),
+ Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.001")))
checkAnswer(sql(
- "SELECT time FROM timestamps WHERE time='1970-01-01 00:00:00.001'"),
- Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))
+ "SELECT time FROM timestamps WHERE time='1969-12-31 16:00:00.001'"),
+ Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.001")))
checkAnswer(sql(
- "SELECT time FROM timestamps WHERE '1970-01-01 00:00:00.001'=time"),
- Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")))
+ "SELECT time FROM timestamps WHERE '1969-12-31 16:00:00.001'=time"),
+ Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.001")))
checkAnswer(sql(
- """SELECT time FROM timestamps WHERE time<'1970-01-01 00:00:00.003'
- AND time>'1970-01-01 00:00:00.001'"""),
- Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002")))
+ """SELECT time FROM timestamps WHERE time<'1969-12-31 16:00:00.003'
+ AND time>'1969-12-31 16:00:00.001'"""),
+ Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.002")))
checkAnswer(sql(
- "SELECT time FROM timestamps WHERE time IN ('1970-01-01 00:00:00.001','1970-01-01 00:00:00.002')"),
- Seq(Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.001")),
- Row(java.sql.Timestamp.valueOf("1970-01-01 00:00:00.002"))))
+ "SELECT time FROM timestamps WHERE time IN ('1969-12-31 16:00:00.001','1969-12-31 16:00:00.002')"),
+ Seq(Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.001")),
+ Row(java.sql.Timestamp.valueOf("1969-12-31 16:00:00.002"))))
checkAnswer(sql(
"SELECT time FROM timestamps WHERE time='123'"),
@@ -296,6 +296,13 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
mapData.collect().take(1).map(Row.fromTuple).toSeq)
}
+ test("date row") {
+ checkAnswer(sql(
+ """select cast("2015-01-28" as date) from testData limit 1"""),
+ Row(java.sql.Date.valueOf("2015-01-28"))
+ )
+ }
+
test("from follow multiple brackets") {
checkAnswer(sql(
"select key from ((select * from testData limit 1) union all (select * from testData limit 1)) x limit 1"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index a015884bae..f26fcc0385 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -83,7 +83,8 @@ class ScalaReflectionRelationSuite extends FunSuite {
assert(sql("SELECT * FROM reflectData").collect().head ===
Row("a", 1, 1L, 1.toFloat, 1.toDouble, 1.toShort, 1.toByte, true,
- new java.math.BigDecimal(1), new Date(12345), new Timestamp(12345), Seq(1,2,3)))
+ new java.math.BigDecimal(1), new Date(70, 0, 1), // This is 1970-01-01
+ new Timestamp(12345), Seq(1,2,3)))
}
test("query case class RDD with nulls") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index be2b34de07..581fccf8ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -30,7 +30,7 @@ class ColumnStatsSuite extends FunSuite {
testColumnStats(classOf[FloatColumnStats], FLOAT, Row(Float.MaxValue, Float.MinValue, 0))
testColumnStats(classOf[DoubleColumnStats], DOUBLE, Row(Double.MaxValue, Double.MinValue, 0))
testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
- testColumnStats(classOf[DateColumnStats], DATE, Row(null, null, 0))
+ testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0))
testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(null, null, 0))
def testColumnStats[T <: NativeType, U <: ColumnStats](
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 87e608a885..9ce845912f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.columnar
import java.nio.ByteBuffer
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
import org.scalatest.FunSuite
@@ -34,7 +34,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
test("defaultSize") {
val checks = Map(
INT -> 4, SHORT -> 2, LONG -> 8, BYTE -> 1, DOUBLE -> 8, FLOAT -> 4, BOOLEAN -> 1,
- STRING -> 8, DATE -> 8, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
+ STRING -> 8, DATE -> 4, TIMESTAMP -> 12, BINARY -> 16, GENERIC -> 16)
checks.foreach { case (columnType, expectedSize) =>
assertResult(expectedSize, s"Wrong defaultSize for $columnType") {
@@ -64,7 +64,7 @@ class ColumnTypeSuite extends FunSuite with Logging {
checkActualSize(FLOAT, Float.MaxValue, 4)
checkActualSize(BOOLEAN, true, 1)
checkActualSize(STRING, "hello", 4 + "hello".getBytes("utf-8").length)
- checkActualSize(DATE, new Date(0L), 8)
+ checkActualSize(DATE, 0, 4)
checkActualSize(TIMESTAMP, new Timestamp(0L), 12)
val binary = Array.fill[Byte](4)(0: Byte)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index f941465fa3..60ed28cc97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -17,11 +17,11 @@
package org.apache.spark.sql.columnar
+import java.sql.Timestamp
+
import scala.collection.immutable.HashSet
import scala.util.Random
-import java.sql.{Date, Timestamp}
-
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types.{DataType, NativeType}
@@ -50,7 +50,7 @@ object ColumnarTestUtils {
case STRING => Random.nextString(Random.nextInt(32))
case BOOLEAN => Random.nextBoolean()
case BINARY => randomBytes(Random.nextInt(32))
- case DATE => new Date(Random.nextLong())
+ case DATE => Random.nextInt()
case TIMESTAMP =>
val timestamp = new Timestamp(Random.nextLong())
timestamp.setNanos(Random.nextInt(999999999))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index cb615388da..1396c6b724 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -67,14 +67,15 @@ class JsonSuite extends QueryTest {
checkTypePromotion(Timestamp.valueOf(strTime), enforceCorrectType(strTime, TimestampType))
val strDate = "2014-10-15"
- checkTypePromotion(Date.valueOf(strDate), enforceCorrectType(strDate, DateType))
+ checkTypePromotion(
+ DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
checkTypePromotion(new Timestamp(3601000), enforceCorrectType(ISO8601Time1, TimestampType))
- checkTypePromotion(new Date(3601000), enforceCorrectType(ISO8601Time1, DateType))
+ checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType))
val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(new Timestamp(10801000), enforceCorrectType(ISO8601Time2, TimestampType))
- checkTypePromotion(new Date(10801000), enforceCorrectType(ISO8601Time2, DateType))
+ checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType))
}
test("Get compatible type") {