aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-12 11:09:42 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-12 11:09:42 +0800
commitac84fb64dd85257da06f93a48fed9bb188140423 (patch)
tree3ff773f74d221f6d72422908b087c0299823f4f9 /sql
parent7a9e25c38380e6c62080d62ad38a4830e44fe753 (diff)
downloadspark-ac84fb64dd85257da06f93a48fed9bb188140423.tar.gz
spark-ac84fb64dd85257da06f93a48fed9bb188140423.tar.bz2
spark-ac84fb64dd85257da06f93a48fed9bb188140423.zip
[SPARK-16434][SQL] Avoid per-record type dispatch in JSON when reading
## What changes were proposed in this pull request? Currently, `JacksonParser.parse` is doing type-based dispatch for each row to convert the tokens to appropriate values for Spark. It might not have to be done like this because the schema is already kept. So, appropriate converters can be created first according to the schema once, and then apply them to each row. This PR corrects `JacksonParser` so that it creates all converters for the schema once and then applies them to each row rather than type dispatching for every row. Benchmark was proceeded with the codes below: #### Parser tests **Before** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String]) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), "", dummyOption) val factory = new JsonFactory() val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => data.foreach { input => val parser = factory.createParser(input) parser.nextToken() JacksonParser.convertRootField(factory, parser, schema) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1697 / 1807 0.1 13256.9 1.0X ``` **After** ```scala test("Benchmark for JSON converter") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val data = List.fill(N)(row) val dummyOption = new JSONOptions(Map.empty[String, String], new SQLConf()) val schema = InferSchema.infer(spark.sparkContext.parallelize(Seq(row)), dummyOption) val benchmark = new Benchmark("JSON converter", N) benchmark.addCase("convert JSON file", 10) { _ => val parser = new JacksonParser(schema, dummyOption) data.foreach { input => parser.parse(input) } } benchmark.run() } ``` ``` JSON converter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ convert JSON file 1401 / 1461 0.1 10947.4 1.0X ``` It seems parsing time is improved by roughly ~20% #### End-to-End test ```scala test("Benchmark for JSON reader") { val N = 500 << 8 val row = """{"struct":{"field1": true, "field2": 92233720368547758070}, "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, "arrayOfString":["str1", "str2"], "arrayOfInteger":[1, 2147483647, -2147483648], "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" val df = spark.sqlContext.read.json(spark.sparkContext.parallelize(List.fill(N)(row))) withTempPath { path => df.write.format("json").save(path.getCanonicalPath) val benchmark = new Benchmark("JSON reader", N) benchmark.addCase("reading JSON file", 10) { _ => spark.read.format("json").load(path.getCanonicalPath).collect() } benchmark.run() } } ``` **Before** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6485 / 6924 0.0 50665.0 1.0X ``` **After** ``` JSON reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ reading JSON file 6350 / 6529 0.0 49609.3 1.0X ``` ## How was this patch tested? Existing test cases should cover this. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14102 from HyukjinKwon/SPARK-16434.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala476
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala11
5 files changed, 297 insertions, 216 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e8c2885d77..e23dacc7a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -319,16 +319,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
columnNameOfCorruptRecord,
parsedOptions)
}
+ val parsed = jsonRDD.mapPartitions { iter =>
+ val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions)
+ iter.flatMap(parser.parse)
+ }
Dataset.ofRows(
sparkSession,
- LogicalRDD(
- schema.toAttributes,
- JacksonParser.parse(
- jsonRDD,
- schema,
- columnNameOfCorruptRecord,
- parsedOptions))(sparkSession))
+ LogicalRDD(schema.toAttributes, parsed)(sparkSession))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 579b036417..91c58d059d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -37,7 +37,7 @@ private[sql] object InferSchema {
*/
def infer(
json: RDD[String],
- columnNameOfCorruptRecords: String,
+ columnNameOfCorruptRecord: String,
configOptions: JSONOptions): StructType = {
require(configOptions.samplingRatio > 0,
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
@@ -60,13 +60,13 @@ private[sql] object InferSchema {
}
} catch {
case _: JsonParseException if shouldHandleCorruptRecord =>
- Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
+ Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
case _: JsonParseException =>
None
}
}
}.fold(StructType(Seq()))(
- compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))
+ compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord))
canonicalizeType(rootType) match {
case Some(st: StructType) => st
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 733fcbfea1..4ae9376b5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.core._
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
@@ -35,184 +34,289 @@ import org.apache.spark.util.Utils
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-object JacksonParser extends Logging {
+class JacksonParser(
+ schema: StructType,
+ columnNameOfCorruptRecord: String,
+ options: JSONOptions) extends Logging {
- def parse(
- input: RDD[String],
- schema: StructType,
- columnNameOfCorruptRecords: String,
- configOptions: JSONOptions): RDD[InternalRow] = {
+ import com.fasterxml.jackson.core.JsonToken._
+
+ // A `ValueConverter` is responsible for converting a value from `JsonParser`
+ // to a value in a field for `InternalRow`.
+ private type ValueConverter = (JsonParser) => Any
+
+ // `ValueConverter`s for the root schema for all fields in the schema
+ private val rootConverter: ValueConverter = makeRootConverter(schema)
- input.mapPartitions { iter =>
- parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
+ private val factory = new JsonFactory()
+ options.setJacksonOptions(factory)
+
+ /**
+ * This function deals with the cases it fails to parse. This function will be called
+ * when exceptions are caught during converting. This functions also deals with `mode` option.
+ */
+ private def failedRecord(record: String): Seq[InternalRow] = {
+ // create a row even if no corrupt record column is present
+ if (options.failFast) {
+ throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+ }
+ if (options.dropMalformed) {
+ logWarning(s"Dropping malformed line: $record")
+ Nil
+ } else {
+ val row = new GenericMutableRow(schema.length)
+ for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
+ require(schema(corruptIndex).dataType == StringType)
+ row.update(corruptIndex, UTF8String.fromString(record))
+ }
+ Seq(row)
}
}
/**
- * Parse the current token (and related children) according to a desired schema
- * This is a wrapper for the method `convertField()` to handle a row wrapped
- * with an array.
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema. This is a wrapper for the method
+ * `makeConverter()` to handle a row wrapped with an array.
*/
- def convertRootField(
- factory: JsonFactory,
- parser: JsonParser,
- schema: DataType): Any = {
- import com.fasterxml.jackson.core.JsonToken._
- (parser.getCurrentToken, schema) match {
- case (START_ARRAY, st: StructType) =>
- // SPARK-3308: support reading top level JSON arrays and take every element
- // in such an array as a row
- convertArray(factory, parser, st)
-
- case (START_OBJECT, ArrayType(st, _)) =>
+ def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
+ case st: StructType =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ // SPARK-3308: support reading top level JSON arrays and take every element
+ // in such an array as a row
+ //
+ // For example, we support, the JSON data as below:
+ //
+ // [{"a":"str_a_1"}]
+ // [{"a":"str_a_2"}, {"b":"str_b_3"}]
+ //
+ // resulting in:
+ //
+ // List([str_a_1,null])
+ // List([str_a_2,null], [null,str_b_3])
+ //
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
+
+ case ArrayType(st: StructType, _) =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
// the business end of SPARK-3308:
- // when an object is found but an array is requested just wrap it in a list
- convertField(factory, parser, st) :: Nil
+ // when an object is found but an array is requested just wrap it in a list.
+ // This is being wrapped in `JacksonParser.parse`.
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
- case _ =>
- convertField(factory, parser, schema)
- }
+ case _ => makeConverter(dataType)
}
- private def convertField(
- factory: JsonFactory,
- parser: JsonParser,
- schema: DataType): Any = {
- import com.fasterxml.jackson.core.JsonToken._
- (parser.getCurrentToken, schema) match {
- case (null | VALUE_NULL, _) =>
- null
+ /**
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema.
+ */
+ private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ case BooleanType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_TRUE => true
+ case VALUE_FALSE => false
+ }
- case (FIELD_NAME, _) =>
- parser.nextToken()
- convertField(factory, parser, schema)
-
- case (VALUE_STRING, StringType) =>
- UTF8String.fromString(parser.getText)
-
- case (VALUE_STRING, _) if parser.getTextLength < 1 =>
- // guard the non string type
- null
-
- case (VALUE_STRING, BinaryType) =>
- parser.getBinaryValue
-
- case (VALUE_STRING, DateType) =>
- val stringValue = parser.getText
- if (stringValue.contains("-")) {
- // The format of this string will probably be "yyyy-mm-dd".
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
- } else {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
+ case ByteType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getByteValue
+ }
- case (VALUE_STRING, TimestampType) =>
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ case ShortType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getShortValue
+ }
- case (VALUE_NUMBER_INT, TimestampType) =>
- parser.getLongValue * 1000000L
+ case IntegerType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getIntValue
+ }
- case (_, StringType) =>
- val writer = new ByteArrayOutputStream()
- Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
- generator => generator.copyCurrentStructure(parser)
- }
- UTF8String.fromBytes(writer.toByteArray)
-
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
- parser.getFloatValue
-
- case (VALUE_STRING, FloatType) =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase()
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toFloat
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
- }
+ case LongType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getLongValue
+ }
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
- parser.getDoubleValue
-
- case (VALUE_STRING, DoubleType) =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase()
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toDouble
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
- }
+ case FloatType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getFloatValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toFloat
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
+ }
+ }
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
- Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+ case DoubleType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getDoubleValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toDouble
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
+ }
+ }
- case (VALUE_NUMBER_INT, ByteType) =>
- parser.getByteValue
+ case StringType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ UTF8String.fromString(parser.getText)
- case (VALUE_NUMBER_INT, ShortType) =>
- parser.getShortValue
+ case _ =>
+ // Note that it always tries to convert the data as string without the case of failure.
+ val writer = new ByteArrayOutputStream()
+ Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+ generator => generator.copyCurrentStructure(parser)
+ }
+ UTF8String.fromBytes(writer.toByteArray)
+ }
- case (VALUE_NUMBER_INT, IntegerType) =>
- parser.getIntValue
+ case TimestampType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.
+ DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
- case (VALUE_NUMBER_INT, LongType) =>
- parser.getLongValue
+ case VALUE_NUMBER_INT =>
+ parser.getLongValue * 1000000L
+ }
- case (VALUE_TRUE, BooleanType) =>
- true
+ case DateType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ val stringValue = parser.getText
+ if (stringValue.contains("-")) {
+ // The format of this string will probably be "yyyy-mm-dd".
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+ } else {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
+ }
- case (VALUE_FALSE, BooleanType) =>
- false
+ case BinaryType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING => parser.getBinaryValue
+ }
- case (START_OBJECT, st: StructType) =>
- convertObject(factory, parser, st)
+ case dt: DecimalType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
+ Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+ }
- case (START_ARRAY, ArrayType(st, _)) =>
- convertArray(factory, parser, st)
+ case st: StructType =>
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ }
- case (START_OBJECT, MapType(StringType, kt, _)) =>
- convertMap(factory, parser, kt)
+ case at: ArrayType =>
+ val elementConverter = makeConverter(at.elementType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
- case (_, udt: UserDefinedType[_]) =>
- convertField(factory, parser, udt.sqlType)
+ case mt: MapType =>
+ val valueConverter = makeConverter(mt.valueType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertMap(parser, valueConverter)
+ }
+
+ case udt: UserDefinedType[_] =>
+ makeConverter(udt.sqlType)
+
+ case _ =>
+ (parser: JsonParser) =>
+ // Here, we pass empty `PartialFunction` so that this case can be
+ // handled as a failed conversion. It will throw an exception as
+ // long as the value is not null.
+ parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
+ }
- case (token, dataType) =>
- // We cannot parse this token based on the given data type. So, we throw a
- // SparkSQLJsonProcessingException and this exception will be caught by
- // parseJson method.
- throw new SparkSQLJsonProcessingException(
- s"Failed to parse a value for data type $dataType (current token: $token).")
+ /**
+ * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
+ * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
+ * token, call `failedConversion` to handle the token.
+ */
+ private def parseJsonToken(
+ parser: JsonParser,
+ dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
+ parser.getCurrentToken match {
+ case FIELD_NAME =>
+ // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
+ parser.nextToken()
+ parseJsonToken(parser, dataType)(f)
+
+ case null | VALUE_NULL => null
+
+ case other => f.applyOrElse(other, failedConversion(parser, dataType))
}
}
/**
+ * This function throws an exception for failed conversion, but returns null for empty string,
+ * to guard the non string types.
+ */
+ private def failedConversion(
+ parser: JsonParser,
+ dataType: DataType): PartialFunction[JsonToken, Any] = {
+ case VALUE_STRING if parser.getTextLength < 1 =>
+ // If conversion is failed, this produces `null` rather than throwing exception.
+ // This will protect the mismatch of types.
+ null
+
+ case token =>
+ // We cannot parse this token based on the given data type. So, we throw a
+ // SparkSQLJsonProcessingException and this exception will be caught by
+ // `parse` method.
+ throw new SparkSQLJsonProcessingException(
+ s"Failed to parse a value for data type $dataType (current token: $token).")
+ }
+
+ /**
* Parse an object from the token stream into a new Row representing the schema.
- *
* Fields in the json that are not defined in the requested schema will be dropped.
*/
private def convertObject(
- factory: JsonFactory,
parser: JsonParser,
- schema: StructType): InternalRow = {
+ schema: StructType,
+ fieldConverters: Seq[ValueConverter]): InternalRow = {
val row = new GenericMutableRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
- row.update(index, convertField(factory, parser, schema(index).dataType))
+ row.update(index, fieldConverters(index).apply(parser))
case None =>
parser.skipChildren()
@@ -223,87 +327,65 @@ object JacksonParser extends Logging {
}
/**
- * Parse an object as a Map, preserving all fields
+ * Parse an object as a Map, preserving all fields.
*/
private def convertMap(
- factory: JsonFactory,
parser: JsonParser,
- valueType: DataType): MapData = {
+ fieldConverter: ValueConverter): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.getCurrentName)
- values += convertField(factory, parser, valueType)
+ values += fieldConverter.apply(parser)
}
+
ArrayBasedMapData(keys.toArray, values.toArray)
}
+ /**
+ * Parse an object as a Array.
+ */
private def convertArray(
- factory: JsonFactory,
parser: JsonParser,
- elementType: DataType): ArrayData = {
+ fieldConverter: ValueConverter): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
- values += convertField(factory, parser, elementType)
+ values += fieldConverter.apply(parser)
}
new GenericArrayData(values.toArray)
}
- def parseJson(
- input: Iterator[String],
- schema: StructType,
- columnNameOfCorruptRecords: String,
- configOptions: JSONOptions): Iterator[InternalRow] = {
-
- def failedRecord(record: String): Seq[InternalRow] = {
- // create a row even if no corrupt record column is present
- if (configOptions.failFast) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
- }
- if (configOptions.dropMalformed) {
- logWarning(s"Dropping malformed line: $record")
- Nil
- } else {
- val row = new GenericMutableRow(schema.length)
- for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
- require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String.fromString(record))
- }
- Seq(row)
- }
- }
-
- val factory = new JsonFactory()
- configOptions.setJacksonOptions(factory)
-
- input.flatMap { record =>
- if (record.trim.isEmpty) {
- Nil
- } else {
- try {
- Utils.tryWithResource(factory.createParser(record)) { parser =>
- parser.nextToken()
-
- convertRootField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- failedRecord(record)
- }
+ /**
+ * Parse the string JSON input to the set of [[InternalRow]]s.
+ */
+ def parse(input: String): Seq[InternalRow] = {
+ if (input.trim.isEmpty) {
+ Nil
+ } else {
+ try {
+ Utils.tryWithResource(factory.createParser(input)) { parser =>
+ parser.nextToken()
+ rootConverter.apply(parser) match {
+ case null => failedRecord(input)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ // Here, as we support reading top level JSON arrays and take every element
+ // in such an array as a row, this case is possible.
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ failedRecord(input)
}
- } catch {
- case _: JsonProcessingException =>
- failedRecord(record)
- case _: SparkSQLJsonProcessingException =>
- failedRecord(record)
}
+ } catch {
+ case _: JsonProcessingException =>
+ failedRecord(input)
+ case _: SparkSQLJsonProcessingException =>
+ failedRecord(input)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index adca8d7af0..19681be604 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -106,12 +106,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
-
- JacksonParser.parseJson(
- lines,
- requiredSchema,
- columnNameOfCorruptRecord,
- parsedOptions)
+ val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions)
+ lines.flatMap(parser.parse)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 177fc04b02..342fd3e82e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -61,9 +61,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}
- Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
- parser.nextToken()
- JacksonParser.convertRootField(factory, parser, dataType)
+ val dummyOption = new JSONOptions(Map.empty[String, String])
+ val dummySchema = StructType(Seq.empty)
+ val parser = new JacksonParser(dummySchema, "", dummyOption)
+
+ Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
+ jsonParser.nextToken()
+ val converter = parser.makeRootConverter(dataType)
+ converter.apply(jsonParser)
}
}