diff options
author | Anatoliy Plastinin <anatoliy.plastinin@gmail.com> | 2016-01-11 10:28:57 -0800 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-01-11 10:28:57 -0800 |
commit | 9559ac5f74434cf4bf611bdcde9a216d39799826 (patch) | |
tree | b2a6b037a73007fddc8692ceb2f46ca1ca141b52 /sql | |
parent | 8fe928b4fe380ba527164bd413402abfed13c0e1 (diff) | |
download | spark-9559ac5f74434cf4bf611bdcde9a216d39799826.tar.gz spark-9559ac5f74434cf4bf611bdcde9a216d39799826.tar.bz2 spark-9559ac5f74434cf4bf611bdcde9a216d39799826.zip |
[SPARK-12744][SQL] Change parsing JSON integers to timestamps to treat integers as number of seconds
JIRA: https://issues.apache.org/jira/browse/SPARK-12744
This PR makes parsing JSON integers to timestamps consistent with casting behavior.
Author: Anatoliy Plastinin <anatoliy.plastinin@gmail.com>
Closes #10687 from antlypls/fix-json-timestamp-parsing.
Diffstat (limited to 'sql')
3 files changed, 20 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index 2e3fe3da15..b2f5c1e964 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -90,7 +90,7 @@ object JacksonParser { DateTimeUtils.stringToTime(parser.getText).getTime * 1000L case (VALUE_NUMBER_INT, TimestampType) => - parser.getLongValue * 1000L + parser.getLongValue * 1000000L case (_, StringType) => val writer = new ByteArrayOutputStream() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b3b6b7df0c..4ab148065a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -83,9 +83,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val doubleNumber: Double = 1.7976931348623157E308d checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, DoubleType)) - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber * 1000L)), enforceCorrectType(intNumber, TimestampType)) - checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong * 1000L)), enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), @@ -1465,4 +1465,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("Casting long as timestamp") { + withTempTable("jsonTable") { + val schema = (new StructType).add("ts", TimestampType) + val jsonDF = sqlContext.read.schema(schema).json(timestampAsLong) + + jsonDF.registerTempTable("jsonTable") + + checkAnswer( + sql("select ts from jsonTable"), + Row(java.sql.Timestamp.valueOf("2016-01-02 03:04:05")) + ) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index cb61f7eeca..a0836058d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -205,6 +205,10 @@ private[json] trait TestJsonData { """{"b": [{"c": {}}]}""" :: """]""" :: Nil) + def timestampAsLong: RDD[String] = + sqlContext.sparkContext.parallelize( + """{"ts":1451732645}""" :: Nil) + lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil) def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]()) |