aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala)6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala)13
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala)4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala)6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala)4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala58
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
18 files changed, 175 insertions, 23 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb122..65dbd6a4e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
}
}
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+ extends Expression with CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+ @transient
+ lazy val parser =
+ new JacksonParser(
+ schema,
+ "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
+ new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+ override def dataType: DataType = schema
+ override def children: Seq[Expression] = child :: Nil
+
+ override def eval(input: InternalRow): Any = {
+ try parser.parse(child.eval(input).toString).head catch {
+ case _: SparkSQLJsonProcessingException => null
+ }
+ }
+
+ override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 02d211d042..aec18922ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
/**
- * Options for the JSON data source.
+ * Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 5ce1bf7432..f80e6373d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import java.io.ByteArrayOutputStream
@@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
+ import JacksonUtils._
+ import ParseModes._
import com.fasterxml.jackson.core.JsonToken._
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -65,7 +68,7 @@ class JacksonParser(
private def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
if (options.failFast) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+ throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
index 005546f37d..c4d9abb2c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.json
+package org.apache.spark.sql.catalyst.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}
-private object JacksonUtils {
+object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
index 41cff07472..435fba9d88 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -15,15 +15,15 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
+import org.apache.hadoop.io.compress._
import org.apache.spark.util.Utils
-private[datasources] object CompressionCodecs {
+object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
index 468228053c..0e466962b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources
+package org.apache.spark.sql.catalyst.util
-private[datasources] object ParseModes {
+object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 7b754091f4..84623934d9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
@@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
}
+
+ test("from_json") {
+ val jsonData = """{"a": 1}"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ InternalRow.fromSeq(1 :: Nil)
+ )
+ }
+
+ test("from_json - invalid data") {
+ val jsonData = """{"a" 1}"""
+ val schema = StructType(StructField("a", IntegerType) :: Nil)
+ checkEvaluation(
+ JsonToStruct(schema, Map.empty, Literal(jsonData)),
+ null
+ )
+
+ // Other modes should still return `null`.
+ checkEvaluation(
+ JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
+ null
+ )
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c86ac..b84fb2fb95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
import scala.collection.JavaConverters._
-import org.apache.spark.Partition
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746a81..4e662a52a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22272..014614eb99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d059d..dc8bd817f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fbd3c..5b55b70186 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6cdca..9fe38ccc9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01ec2..9f96667311 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a2da..3bc1c5b900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
@@ -2819,6 +2820,63 @@ object functions {
}
/**
+ * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ * @param e a string column containing JSON data.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+ JsonToStruct(schema, options, e.expr)
+ }
+
+ /**
+ * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+ from_json(e, schema, options.asScala.toMap)
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType): Column =
+ from_json(e, schema, Map.empty[String, String])
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string as a json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+ from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
+ /**
* Returns length of array or map.
*
* @group collection_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57f..518d6e92b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
+
+ test("json_parser") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(1)) :: Nil)
+ }
+
+ test("json_parser missing columns") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("b", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(null)) :: Nil)
+ }
+
+ test("json_parser invalid json") {
+ val df = Seq("""{"a" 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(null) :: Nil)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbd..0b72da5f37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.test.SharedSQLContext
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e1..456052f79a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType