aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala476
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala11
5 files changed, 297 insertions, 216 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e8c2885d77..e23dacc7a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -319,16 +319,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
columnNameOfCorruptRecord,
parsedOptions)
}
+ val parsed = jsonRDD.mapPartitions { iter =>
+ val parser = new JacksonParser(schema, columnNameOfCorruptRecord, parsedOptions)
+ iter.flatMap(parser.parse)
+ }
Dataset.ofRows(
sparkSession,
- LogicalRDD(
- schema.toAttributes,
- JacksonParser.parse(
- jsonRDD,
- schema,
- columnNameOfCorruptRecord,
- parsedOptions))(sparkSession))
+ LogicalRDD(schema.toAttributes, parsed)(sparkSession))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 579b036417..91c58d059d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -37,7 +37,7 @@ private[sql] object InferSchema {
*/
def infer(
json: RDD[String],
- columnNameOfCorruptRecords: String,
+ columnNameOfCorruptRecord: String,
configOptions: JSONOptions): StructType = {
require(configOptions.samplingRatio > 0,
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
@@ -60,13 +60,13 @@ private[sql] object InferSchema {
}
} catch {
case _: JsonParseException if shouldHandleCorruptRecord =>
- Some(StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))))
+ Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType))))
case _: JsonParseException =>
None
}
}
}.fold(StructType(Seq()))(
- compatibleRootType(columnNameOfCorruptRecords, shouldHandleCorruptRecord))
+ compatibleRootType(columnNameOfCorruptRecord, shouldHandleCorruptRecord))
canonicalizeType(rootType) match {
case Some(st: StructType) => st
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 733fcbfea1..4ae9376b5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -24,7 +24,6 @@ import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.core._
import org.apache.spark.internal.Logging
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
@@ -35,184 +34,289 @@ import org.apache.spark.util.Utils
private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-object JacksonParser extends Logging {
+class JacksonParser(
+ schema: StructType,
+ columnNameOfCorruptRecord: String,
+ options: JSONOptions) extends Logging {
- def parse(
- input: RDD[String],
- schema: StructType,
- columnNameOfCorruptRecords: String,
- configOptions: JSONOptions): RDD[InternalRow] = {
+ import com.fasterxml.jackson.core.JsonToken._
+
+ // A `ValueConverter` is responsible for converting a value from `JsonParser`
+ // to a value in a field for `InternalRow`.
+ private type ValueConverter = (JsonParser) => Any
+
+ // `ValueConverter`s for the root schema for all fields in the schema
+ private val rootConverter: ValueConverter = makeRootConverter(schema)
- input.mapPartitions { iter =>
- parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
+ private val factory = new JsonFactory()
+ options.setJacksonOptions(factory)
+
+ /**
+ * This function deals with the cases it fails to parse. This function will be called
+ * when exceptions are caught during converting. This functions also deals with `mode` option.
+ */
+ private def failedRecord(record: String): Seq[InternalRow] = {
+ // create a row even if no corrupt record column is present
+ if (options.failFast) {
+ throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
+ }
+ if (options.dropMalformed) {
+ logWarning(s"Dropping malformed line: $record")
+ Nil
+ } else {
+ val row = new GenericMutableRow(schema.length)
+ for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
+ require(schema(corruptIndex).dataType == StringType)
+ row.update(corruptIndex, UTF8String.fromString(record))
+ }
+ Seq(row)
}
}
/**
- * Parse the current token (and related children) according to a desired schema
- * This is a wrapper for the method `convertField()` to handle a row wrapped
- * with an array.
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema. This is a wrapper for the method
+ * `makeConverter()` to handle a row wrapped with an array.
*/
- def convertRootField(
- factory: JsonFactory,
- parser: JsonParser,
- schema: DataType): Any = {
- import com.fasterxml.jackson.core.JsonToken._
- (parser.getCurrentToken, schema) match {
- case (START_ARRAY, st: StructType) =>
- // SPARK-3308: support reading top level JSON arrays and take every element
- // in such an array as a row
- convertArray(factory, parser, st)
-
- case (START_OBJECT, ArrayType(st, _)) =>
+ def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
+ case st: StructType =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ // SPARK-3308: support reading top level JSON arrays and take every element
+ // in such an array as a row
+ //
+ // For example, we support, the JSON data as below:
+ //
+ // [{"a":"str_a_1"}]
+ // [{"a":"str_a_2"}, {"b":"str_b_3"}]
+ //
+ // resulting in:
+ //
+ // List([str_a_1,null])
+ // List([str_a_2,null], [null,str_b_3])
+ //
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
+
+ case ArrayType(st: StructType, _) =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
// the business end of SPARK-3308:
- // when an object is found but an array is requested just wrap it in a list
- convertField(factory, parser, st) :: Nil
+ // when an object is found but an array is requested just wrap it in a list.
+ // This is being wrapped in `JacksonParser.parse`.
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
- case _ =>
- convertField(factory, parser, schema)
- }
+ case _ => makeConverter(dataType)
}
- private def convertField(
- factory: JsonFactory,
- parser: JsonParser,
- schema: DataType): Any = {
- import com.fasterxml.jackson.core.JsonToken._
- (parser.getCurrentToken, schema) match {
- case (null | VALUE_NULL, _) =>
- null
+ /**
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema.
+ */
+ private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ case BooleanType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_TRUE => true
+ case VALUE_FALSE => false
+ }
- case (FIELD_NAME, _) =>
- parser.nextToken()
- convertField(factory, parser, schema)
-
- case (VALUE_STRING, StringType) =>
- UTF8String.fromString(parser.getText)
-
- case (VALUE_STRING, _) if parser.getTextLength < 1 =>
- // guard the non string type
- null
-
- case (VALUE_STRING, BinaryType) =>
- parser.getBinaryValue
-
- case (VALUE_STRING, DateType) =>
- val stringValue = parser.getText
- if (stringValue.contains("-")) {
- // The format of this string will probably be "yyyy-mm-dd".
- DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
- } else {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
+ case ByteType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getByteValue
+ }
- case (VALUE_STRING, TimestampType) =>
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ case ShortType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getShortValue
+ }
- case (VALUE_NUMBER_INT, TimestampType) =>
- parser.getLongValue * 1000000L
+ case IntegerType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getIntValue
+ }
- case (_, StringType) =>
- val writer = new ByteArrayOutputStream()
- Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
- generator => generator.copyCurrentStructure(parser)
- }
- UTF8String.fromBytes(writer.toByteArray)
-
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) =>
- parser.getFloatValue
-
- case (VALUE_STRING, FloatType) =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase()
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toFloat
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
- }
+ case LongType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getLongValue
+ }
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
- parser.getDoubleValue
-
- case (VALUE_STRING, DoubleType) =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase()
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toDouble
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
- }
+ case FloatType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getFloatValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toFloat
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
+ }
+ }
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
- Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+ case DoubleType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getDoubleValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toDouble
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
+ }
+ }
- case (VALUE_NUMBER_INT, ByteType) =>
- parser.getByteValue
+ case StringType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ UTF8String.fromString(parser.getText)
- case (VALUE_NUMBER_INT, ShortType) =>
- parser.getShortValue
+ case _ =>
+ // Note that it always tries to convert the data as string without the case of failure.
+ val writer = new ByteArrayOutputStream()
+ Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+ generator => generator.copyCurrentStructure(parser)
+ }
+ UTF8String.fromBytes(writer.toByteArray)
+ }
- case (VALUE_NUMBER_INT, IntegerType) =>
- parser.getIntValue
+ case TimestampType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.
+ DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
- case (VALUE_NUMBER_INT, LongType) =>
- parser.getLongValue
+ case VALUE_NUMBER_INT =>
+ parser.getLongValue * 1000000L
+ }
- case (VALUE_TRUE, BooleanType) =>
- true
+ case DateType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ val stringValue = parser.getText
+ if (stringValue.contains("-")) {
+ // The format of this string will probably be "yyyy-mm-dd".
+ DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+ } else {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
+ }
- case (VALUE_FALSE, BooleanType) =>
- false
+ case BinaryType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING => parser.getBinaryValue
+ }
- case (START_OBJECT, st: StructType) =>
- convertObject(factory, parser, st)
+ case dt: DecimalType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
+ Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+ }
- case (START_ARRAY, ArrayType(st, _)) =>
- convertArray(factory, parser, st)
+ case st: StructType =>
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ }
- case (START_OBJECT, MapType(StringType, kt, _)) =>
- convertMap(factory, parser, kt)
+ case at: ArrayType =>
+ val elementConverter = makeConverter(at.elementType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
- case (_, udt: UserDefinedType[_]) =>
- convertField(factory, parser, udt.sqlType)
+ case mt: MapType =>
+ val valueConverter = makeConverter(mt.valueType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertMap(parser, valueConverter)
+ }
+
+ case udt: UserDefinedType[_] =>
+ makeConverter(udt.sqlType)
+
+ case _ =>
+ (parser: JsonParser) =>
+ // Here, we pass empty `PartialFunction` so that this case can be
+ // handled as a failed conversion. It will throw an exception as
+ // long as the value is not null.
+ parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
+ }
- case (token, dataType) =>
- // We cannot parse this token based on the given data type. So, we throw a
- // SparkSQLJsonProcessingException and this exception will be caught by
- // parseJson method.
- throw new SparkSQLJsonProcessingException(
- s"Failed to parse a value for data type $dataType (current token: $token).")
+ /**
+ * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
+ * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
+ * token, call `failedConversion` to handle the token.
+ */
+ private def parseJsonToken(
+ parser: JsonParser,
+ dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
+ parser.getCurrentToken match {
+ case FIELD_NAME =>
+ // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
+ parser.nextToken()
+ parseJsonToken(parser, dataType)(f)
+
+ case null | VALUE_NULL => null
+
+ case other => f.applyOrElse(other, failedConversion(parser, dataType))
}
}
/**
+ * This function throws an exception for failed conversion, but returns null for empty string,
+ * to guard the non string types.
+ */
+ private def failedConversion(
+ parser: JsonParser,
+ dataType: DataType): PartialFunction[JsonToken, Any] = {
+ case VALUE_STRING if parser.getTextLength < 1 =>
+ // If conversion is failed, this produces `null` rather than throwing exception.
+ // This will protect the mismatch of types.
+ null
+
+ case token =>
+ // We cannot parse this token based on the given data type. So, we throw a
+ // SparkSQLJsonProcessingException and this exception will be caught by
+ // `parse` method.
+ throw new SparkSQLJsonProcessingException(
+ s"Failed to parse a value for data type $dataType (current token: $token).")
+ }
+
+ /**
* Parse an object from the token stream into a new Row representing the schema.
- *
* Fields in the json that are not defined in the requested schema will be dropped.
*/
private def convertObject(
- factory: JsonFactory,
parser: JsonParser,
- schema: StructType): InternalRow = {
+ schema: StructType,
+ fieldConverters: Seq[ValueConverter]): InternalRow = {
val row = new GenericMutableRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
- row.update(index, convertField(factory, parser, schema(index).dataType))
+ row.update(index, fieldConverters(index).apply(parser))
case None =>
parser.skipChildren()
@@ -223,87 +327,65 @@ object JacksonParser extends Logging {
}
/**
- * Parse an object as a Map, preserving all fields
+ * Parse an object as a Map, preserving all fields.
*/
private def convertMap(
- factory: JsonFactory,
parser: JsonParser,
- valueType: DataType): MapData = {
+ fieldConverter: ValueConverter): MapData = {
val keys = ArrayBuffer.empty[UTF8String]
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_OBJECT)) {
keys += UTF8String.fromString(parser.getCurrentName)
- values += convertField(factory, parser, valueType)
+ values += fieldConverter.apply(parser)
}
+
ArrayBasedMapData(keys.toArray, values.toArray)
}
+ /**
+ * Parse an object as a Array.
+ */
private def convertArray(
- factory: JsonFactory,
parser: JsonParser,
- elementType: DataType): ArrayData = {
+ fieldConverter: ValueConverter): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
- values += convertField(factory, parser, elementType)
+ values += fieldConverter.apply(parser)
}
new GenericArrayData(values.toArray)
}
- def parseJson(
- input: Iterator[String],
- schema: StructType,
- columnNameOfCorruptRecords: String,
- configOptions: JSONOptions): Iterator[InternalRow] = {
-
- def failedRecord(record: String): Seq[InternalRow] = {
- // create a row even if no corrupt record column is present
- if (configOptions.failFast) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
- }
- if (configOptions.dropMalformed) {
- logWarning(s"Dropping malformed line: $record")
- Nil
- } else {
- val row = new GenericMutableRow(schema.length)
- for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
- require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String.fromString(record))
- }
- Seq(row)
- }
- }
-
- val factory = new JsonFactory()
- configOptions.setJacksonOptions(factory)
-
- input.flatMap { record =>
- if (record.trim.isEmpty) {
- Nil
- } else {
- try {
- Utils.tryWithResource(factory.createParser(record)) { parser =>
- parser.nextToken()
-
- convertRootField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- failedRecord(record)
- }
+ /**
+ * Parse the string JSON input to the set of [[InternalRow]]s.
+ */
+ def parse(input: String): Seq[InternalRow] = {
+ if (input.trim.isEmpty) {
+ Nil
+ } else {
+ try {
+ Utils.tryWithResource(factory.createParser(input)) { parser =>
+ parser.nextToken()
+ rootConverter.apply(parser) match {
+ case null => failedRecord(input)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ // Here, as we support reading top level JSON arrays and take every element
+ // in such an array as a row, this case is possible.
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ failedRecord(input)
}
- } catch {
- case _: JsonProcessingException =>
- failedRecord(record)
- case _: SparkSQLJsonProcessingException =>
- failedRecord(record)
}
+ } catch {
+ case _: JsonProcessingException =>
+ failedRecord(input)
+ case _: SparkSQLJsonProcessingException =>
+ failedRecord(input)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index adca8d7af0..19681be604 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -106,12 +106,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
(file: PartitionedFile) => {
val lines = new HadoopFileLinesReader(file, broadcastedHadoopConf.value.value).map(_.toString)
-
- JacksonParser.parseJson(
- lines,
- requiredSchema,
- columnNameOfCorruptRecord,
- parsedOptions)
+ val parser = new JacksonParser(requiredSchema, columnNameOfCorruptRecord, parsedOptions)
+ lines.flatMap(parser.parse)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 177fc04b02..342fd3e82e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -61,9 +61,14 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
generator.flush()
}
- Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
- parser.nextToken()
- JacksonParser.convertRootField(factory, parser, dataType)
+ val dummyOption = new JSONOptions(Map.empty[String, String])
+ val dummySchema = StructType(Seq.empty)
+ val parser = new JacksonParser(dummySchema, "", dummyOption)
+
+ Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
+ jsonParser.nextToken()
+ val converter = parser.makeRootConverter(dataType)
+ converter.apply(jsonParser)
}
}