aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-09-29 13:01:10 -0700
committerYin Huai <yhuai@databricks.com>2016-09-29 13:01:10 -0700
commitfe33121a53384811a8e094ab6c05dc85b7c7ca87 (patch)
treed0575a3d0eefe46ea4b8e200e70d0834b566b477
parent027dea8f294504bc5cd8bfedde546d171cb78657 (diff)
downloadspark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.gz
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.bz2
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.zip
[SPARK-17699] Support for parsing JSON string columns
Spark SQL has great support for reading text files that contain JSON data. However, in many cases the JSON data is just one column amongst others. This is particularly true when reading from sources such as Kafka. This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema. Example usage: ```scala val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) df.select(from_json($"value", schema) as 'json) // => [json: <a: int>] ``` This PR adds support for java, scala and python. I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it). I left SQL out for now, because I'm not sure how users would specify a schema. Author: Michael Armbrust <michael@databricks.com> Closes #15274 from marmbrus/jsonParser.
-rw-r--r--python/pyspark/sql/functions.py23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala)6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala)13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala)4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala)6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala)4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala58
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
19 files changed, 198 insertions, 23 deletions
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 89b3c07c07..45d6bf944b 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
return Column(jc)
+@since(2.1)
+def from_json(col, schema, options={}):
+ """
+ Parses a column containing a JSON string into a [[StructType]] with the
+ specified schema. Returns `null`, in the case of an unparseable string.
+
+ :param col: string column in json format
+ :param schema: a StructType to use when parsing the json column
+ :param options: options to control parsing. accepts the same options as the json datasource
+
+ >>> from pyspark.sql.types import *
+ >>> data = [(1, '''{"a": 1}''')]
+ >>> schema = StructType([StructField("a", IntegerType())])
+ >>> df = spark.createDataFrame(data, ("key", "value"))
+ >>> df.select(from_json(df.value, schema).alias("json")).collect()
+ [Row(json=Row(a=1))]
+ """
+
+ sc = SparkContext._active_spark_context
+ jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
+ return Column(jc)
+
+
@since(1.5)
def size(col):
"""
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb122..65dbd6a4e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
}
}
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+ extends Expression with CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+ @transient
+ lazy val parser =
+ new JacksonParser(
+ schema,
+ "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
+ new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+ override def dataType: DataType = schema
+ override def children: Seq[Expression] = child :: Nil
+
+ override def eval(input: InternalRow): Any = {
+ try parser.parse(child.eval(input).toString).head catch {
+ case _: SparkSQLJsonProcessingException => null
+ }
+ }
+
+ override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02d211d042..aec18922ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
/**
- * Options for the JSON data source.
+ * Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 5ce1bf7432..f80e6373d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import java.io.ByteArrayOutputStream
@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
+ import JacksonUtils._
+ import ParseModes._
import com.fasterxml.jackson.core.JsonToken._
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
private def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
if (options.failFast) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+ throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 005546f37d..c4d9abb2c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
-private object JacksonUtils {
+object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 41cff07472..435fba9d88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
+import org.apache.hadoop.io.compress._
import org.apache.spark.util.Utils
-private[datasources] object CompressionCodecs {
+object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
index 468228053c..0e466962b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
-private[datasources] object ParseModes {
+object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7b754091f4..84623934d9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
}
+
+ test("from_json") {
+ val jsonData = """{"a": 1}"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ InternalRow.fromSeq(1 :: Nil)
+ )
+ }
+
+ test("from_json - invalid data") {
+ val jsonData = """{"a" 1}"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ null
+ )
+
+ // Other modes should still return `null`.
+ checkEvaluation(
+ JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+ null
+ )
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c86ac..b84fb2fb95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
import scala.collection.JavaConverters._
-import org.apache.spark.Partition
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746a81..4e662a52a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22272..014614eb99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d059d..dc8bd817f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fbd3c..5b55b70186 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6cdca..9fe38ccc9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01ec2..9f96667311 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a2da..3bc1c5b900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
@@ -2819,6 +2820,63 @@ object functions {
}
/**
+ * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ * @param e a string column containing JSON data.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+ JsonToStruct(schema, options, e.expr)
+ }
+
+ /**
+ * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+ from_json(e, schema, options.asScala.toMap)
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType): Column =
+ from_json(e, schema, Map.empty[String, String])
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string as a json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+ from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
+ /**
* Returns length of array or map.
*
* @group collection_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57f..518d6e92b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
+
+ test("json_parser") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(1)) :: Nil)
+ }
+
+ test("json_parser missing columns") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("b", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(null)) :: Nil)
+ }
+
+ test("json_parser invalid json") {
+ val df = Seq("""{"a" 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(null) :: Nil)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbd..0b72da5f37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.test.SharedSQLContext
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e1..456052f79a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType