aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoliy Plastinin <anatoliy.plastinin@gmail.com>2016-01-11 10:28:57 -0800
committerYin Huai <yhuai@databricks.com>2016-01-11 10:28:57 -0800
commit9559ac5f74434cf4bf611bdcde9a216d39799826 (patch)
treeb2a6b037a73007fddc8692ceb2f46ca1ca141b52
parent8fe928b4fe380ba527164bd413402abfed13c0e1 (diff)
downloadspark-9559ac5f74434cf4bf611bdcde9a216d39799826.tar.gz
spark-9559ac5f74434cf4bf611bdcde9a216d39799826.tar.bz2
spark-9559ac5f74434cf4bf611bdcde9a216d39799826.zip
[SPARK-12744][SQL] Change parsing JSON integers to timestamps to treat integers as number of seconds
JIRA: https://issues.apache.org/jira/browse/SPARK-12744 This PR makes parsing JSON integers to timestamps consistent with casting behavior. Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com> Closes #10687 from antlypls/fix-json-timestamp-parsing.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala4
3 files changed, 20 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 2e3fe3da15..b2f5c1e964 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -90,7 +90,7 @@ object JacksonParser {
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
case (VALUE_NUMBER_INT, TimestampType) =>
- parser.getLongValue * 1000L
+ parser.getLongValue * 1000000L
case (_, StringType) =>
val writer = new ByteArrayOutputStream()
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index b3b6b7df0c..4ab148065a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -83,9 +83,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val doubleNumber: Double = 1.7976931348623157E308d
checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType))
- checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber * 1000L)),
enforceCorrectType(intNumber, TimestampType))
- checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)),
+ checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)),
enforceCorrectType(intNumber.toLong, TimestampType))
val strTime = "2014-09-30 12:34:56"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
@@ -1465,4 +1465,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("Casting long as timestamp") {
+ withTempTable("jsonTable") {
+ val schema = (new StructType).add("ts", TimestampType)
+ val jsonDF = sqlContext.read.schema(schema).json(timestampAsLong)
+
+ jsonDF.registerTempTable("jsonTable")
+
+ checkAnswer(
+ sql("select ts from jsonTable"),
+ Row(java.sql.Timestamp.valueOf("2016-01-02 03:04:05"))
+ )
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index cb61f7eeca..a0836058d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -205,6 +205,10 @@ private[json] trait TestJsonData {
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)
+ def timestampAsLong: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ """{"ts":1451732645}""" :: Nil)
+
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())