aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-09-29 13:01:10 -0700
committerYin Huai <yhuai@databricks.com>2016-09-29 13:01:10 -0700
commitfe33121a53384811a8e094ab6c05dc85b7c7ca87 (patch)
treed0575a3d0eefe46ea4b8e200e70d0834b566b477 /sql/core/src/test
parent027dea8f294504bc5cd8bfedde546d171cb78657 (diff)
downloadspark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.gz
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.bz2
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.zip
[SPARK-17699] Support for parsing JSON string columns
Spark SQL has great support for reading text files that contain JSON data. However, in many cases the JSON data is just one column amongst others. This is particularly true when reading from sources such as Kafka. This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema. Example usage: ```scala val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) df.select(from_json($"value", schema) as 'json) // => [json: <a: int>] ``` This PR adds support for java, scala and python. I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it). I left SQL out for now, because I'm not sure how users would specify a schema. Author: Michael Armbrust <michael@databricks.com> Closes #15274 from marmbrus/jsonParser.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
3 files changed, 32 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57f..518d6e92b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
+
+ test("json_parser") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(1)) :: Nil)
+ }
+
+ test("json_parser missing columns") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("b", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(null)) :: Nil)
+ }
+
+ test("json_parser invalid json") {
+ val df = Seq("""{"a" 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(null) :: Nil)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbd..0b72da5f37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.test.SharedSQLContext
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e1..456052f79a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType