aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-09-21 18:06:45 -0700
committerCheng Lian <lian@databricks.com>2015-09-21 18:06:45 -0700
commit0494c80ef54f6f3a8c6f2d92abfe1a77a91df8b0 (patch)
tree7c7143500d3972ba790802954a44fa0801cdf7f9 /sql
parent72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09 (diff)
downloadspark-0494c80ef54f6f3a8c6f2d92abfe1a77a91df8b0.tar.gz
spark-0494c80ef54f6f3a8c6f2d92abfe1a77a91df8b0.tar.bz2
spark-0494c80ef54f6f3a8c6f2d92abfe1a77a91df8b0.zip
[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0.
https://issues.apache.org/jira/browse/SPARK-10681 Author: Yin Huai <yhuai@databricks.com> Closes #8806 from yhuai/SPARK-10495.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala15
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala103
3 files changed, 152 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index f65c7bbd6e..23bada1ddd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
valWriter(field.dataType, v)
}
gen.writeEndObject()
+
+ // For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
+ case (ArrayType(ty, _), v: ArrayData) =>
+ gen.writeStartArray()
+ v.foreach(ty, (_, value) => valWriter(ty, value))
+ gen.writeEndArray()
+
+ case (MapType(kt, vt, _), v: MapData) =>
+ gen.writeStartObject()
+ v.foreach(kt, vt, { (k, v) =>
+ gen.writeFieldName(k.toString)
+ valWriter(vt, v)
+ })
+ gen.writeEndObject()
+
+ case (StructType(ty), v: InternalRow) =>
+ gen.writeStartObject()
+ var i = 0
+ while (i < ty.length) {
+ val field = ty(i)
+ val value = v.get(i, field.dataType)
+ if (value != null) {
+ gen.writeFieldName(field.name)
+ valWriter(field.dataType, value)
+ }
+ i += 1
+ }
+ gen.writeEndObject()
+
+ case (dt, v) =>
+ sys.error(
+ s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}
valWriter(rowSchema, row)
@@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
i += 1
}
gen.writeEndObject()
+
+ case (dt, v) =>
+ sys.error(
+ s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}
valWriter(rowSchema, row)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index ff4d8c04e8..c51140749c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -62,10 +62,23 @@ private[sql] object JacksonParser {
// guard the non string type
null
+ case (VALUE_STRING, BinaryType) =>
+ parser.getBinaryValue
+
case (VALUE_STRING, DateType) =>
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+ val stringValue = parser.getText
+ if (stringValue.contains("-")) {
+ // The format of this string will probably be "yyyy-mm-dd".
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+ } else {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
case (VALUE_STRING, TimestampType) =>
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
case (VALUE_NUMBER_INT, TimestampType) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6a18cc6d27..b614e6c414 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
import org.apache.spark.rdd.RDD
import org.scalactic.Tolerance._
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
@@ -1159,4 +1159,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
"SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
})
}
+
+ test("backward compatibility") {
+ // This test we make sure our JSON support can read JSON data generated by previous version
+ // of Spark generated through toJSON method and JSON data source.
+ // The data is generated by the following program.
+ // Here are a few notes:
+ // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
+ // in the JSON object.
+ // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
+ // JSON objects generated by those Spark versions (col17).
+ // - If the type is NullType, we do not write data out.
+
+ // Create the schema.
+ val struct =
+ StructType(
+ StructField("f1", FloatType, true) ::
+ StructField("f2", ArrayType(BooleanType), true) :: Nil)
+
+ val dataTypes =
+ Seq(
+ StringType, BinaryType, NullType, BooleanType,
+ ByteType, ShortType, IntegerType, LongType,
+ FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+ DateType, TimestampType,
+ ArrayType(IntegerType), MapType(StringType, LongType), struct,
+ new MyDenseVectorUDT())
+ val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+ StructField(s"col$index", dataType, nullable = true)
+ }
+ val schema = StructType(fields)
+
+ val constantValues =
+ Seq(
+ "a string in binary".getBytes("UTF-8"),
+ null,
+ true,
+ 1.toByte,
+ 2.toShort,
+ 3,
+ Long.MaxValue,
+ 0.25.toFloat,
+ 0.75,
+ new java.math.BigDecimal(s"1234.23456"),
+ new java.math.BigDecimal(s"1.23456"),
+ java.sql.Date.valueOf("2015-01-01"),
+ java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
+ Seq(2, 3, 4),
+ Map("a string" -> 2000L),
+ Row(4.75.toFloat, Seq(false, true)),
+ new MyDenseVector(Array(0.25, 2.25, 4.25)))
+ val data =
+ Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil
+
+ // Data generated by previous versions.
+ // scalastyle:off
+ val existingJSONData =
+ """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
+ """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
+ // scalastyle:on
+
+ // Generate data for the current version.
+ val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
+ withTempPath { path =>
+ df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
+
+ // df.toJSON will convert internal rows to external rows first and then generate
+ // JSON objects. While, df.write.format("json") will write internal rows directly.
+ val allJSON =
+ existingJSONData ++
+ df.toJSON.collect() ++
+ sparkContext.textFile(path.getCanonicalPath).collect()
+
+ Utils.deleteRecursively(path)
+ sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)
+
+ // Read data back with the schema specified.
+ val col0Values =
+ Seq(
+ "Spark 1.2.2",
+ "Spark 1.3.1",
+ "Spark 1.3.1",
+ "Spark 1.4.1",
+ "Spark 1.4.1",
+ "Spark 1.5.0",
+ "Spark 1.5.0",
+ "Spark " + sqlContext.sparkContext.version,
+ "Spark " + sqlContext.sparkContext.version)
+ val expectedResult = col0Values.map { v =>
+ Row.fromSeq(Seq(v) ++ constantValues)
+ }
+ checkAnswer(
+ sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
+ expectedResult
+ )
+ }
+ }
}