aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2017-02-16 20:51:19 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-16 20:51:19 -0800
commit21fde57f15db974b710e7b00e72c744da7c1ac3c (patch)
treee51d0ab5ad405ff66c6459738186406a597a8f1c /sql/catalyst/src
parentdcc2d540a53f0bd04baead43fdee1c170ef2b9f3 (diff)
downloadspark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.gz
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.bz2
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.zip
[SPARK-18352][SQL] Support parsing multiline json files
## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. Author: Nathan Howell <nhowell@godaddy.com> Closes #16386 from NathanHowell/SPARK-18352.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala46
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala287
4 files changed, 236 insertions, 127 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index bd852a50fe..1e690a4469 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -497,8 +497,7 @@ case class JsonToStruct(
lazy val parser =
new JacksonParser(
schema,
- "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
- new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
+ new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
override def dataType: DataType = schema
@@ -506,7 +505,12 @@ case class JsonToStruct(
copy(timeZoneId = Option(timeZoneId))
override def nullSafeEval(json: Any): Any = {
- try parser.parse(json.toString).headOption.orNull catch {
+ try {
+ parser.parse(
+ json.asInstanceOf[UTF8String],
+ CreateJacksonParser.utf8String,
+ identity[UTF8String]).headOption.orNull
+ } catch {
case _: SparkSQLJsonProcessingException => null
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
new file mode 100644
index 0000000000..e0ed03a689
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.InputStream
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.unsafe.types.UTF8String
+
+private[sql] object CreateJacksonParser extends Serializable {
+ def string(jsonFactory: JsonFactory, record: String): JsonParser = {
+ jsonFactory.createParser(record)
+ }
+
+ def utf8String(jsonFactory: JsonFactory, record: UTF8String): JsonParser = {
+ val bb = record.getByteBuffer
+ assert(bb.hasArray)
+
+ jsonFactory.createParser(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+ }
+
+ def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
+ jsonFactory.createParser(record.getBytes, 0, record.getLength)
+ }
+
+ def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
+ jsonFactory.createParser(record)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 5307ce1cb7..5a91f9c193 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -31,11 +31,20 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
- @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
+ @transient private val parameters: CaseInsensitiveMap[String],
+ defaultTimeZoneId: String,
+ defaultColumnNameOfCorruptRecord: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String], defaultTimeZoneId: String) =
- this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
+ def this(
+ parameters: Map[String, String],
+ defaultTimeZoneId: String,
+ defaultColumnNameOfCorruptRecord: String = "") = {
+ this(
+ CaseInsensitiveMap(parameters),
+ defaultTimeZoneId,
+ defaultColumnNameOfCorruptRecord)
+ }
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -57,7 +66,8 @@ private[sql] class JSONOptions(
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
- val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+ val columnNameOfCorruptRecord =
+ parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
@@ -69,6 +79,8 @@ private[sql] class JSONOptions(
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
+ val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
+
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 03e27ba934..995095969d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -39,7 +39,6 @@ private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeE
*/
class JacksonParser(
schema: StructType,
- columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {
import JacksonUtils._
@@ -48,69 +47,110 @@ class JacksonParser(
// A `ValueConverter` is responsible for converting a value from `JsonParser`
// to a value in a field for `InternalRow`.
- private type ValueConverter = (JsonParser) => Any
+ private type ValueConverter = JsonParser => AnyRef
// `ValueConverter`s for the root schema for all fields in the schema
- private val rootConverter: ValueConverter = makeRootConverter(schema)
+ private val rootConverter = makeRootConverter(schema)
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+ private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
+ corruptFieldIndex.foreach(idx => require(schema(idx).dataType == StringType))
+
+ @transient
+ private[this] var isWarningPrinted: Boolean = false
+
@transient
- private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+ private def printWarningForMalformedRecord(record: () => UTF8String): Unit = {
+ def sampleRecord: String = {
+ if (options.wholeFile) {
+ ""
+ } else {
+ s"Sample record: ${record()}\n"
+ }
+ }
+
+ def footer: String = {
+ s"""Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin
+ }
+
+ if (options.permissive) {
+ logWarning(
+ s"""Found at least one malformed record. The JSON reader will replace
+ |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+ |To find out which corrupted records have been replaced with null, please use the
+ |default inferred schema instead of providing a custom schema.
+ |
+ |${sampleRecord ++ footer}
+ |
+ """.stripMargin)
+ } else if (options.dropMalformed) {
+ logWarning(
+ s"""Found at least one malformed record. The JSON reader will drop
+ |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+ |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+ |mode and use the default inferred schema.
+ |
+ |${sampleRecord ++ footer}
+ |
+ """.stripMargin)
+ }
+ }
+
+ @transient
+ private def printWarningIfWholeFile(): Unit = {
+ if (options.wholeFile && corruptFieldIndex.isDefined) {
+ logWarning(
+ s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result
+ |in very large allocations or OutOfMemoryExceptions being raised.
+ |
+ """.stripMargin)
+ }
+ }
/**
* This function deals with the cases it fails to parse. This function will be called
* when exceptions are caught during converting. This functions also deals with `mode` option.
*/
- private def failedRecord(record: String): Seq[InternalRow] = {
- // create a row even if no corrupt record column is present
- if (options.failFast) {
- throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
- }
- if (options.dropMalformed) {
- if (!isWarningPrintedForMalformedRecord) {
- logWarning(
- s"""Found at least one malformed records (sample: $record). The JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
- |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===================================================
- |// The corrupted record exists in column ${columnNameOfCorruptRecord}
- |val parsedJson = spark.read.json("/path/to/json/file/test.json")
- |
- """.stripMargin)
- isWarningPrintedForMalformedRecord = true
- }
- Nil
- } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
- if (!isWarningPrintedForMalformedRecord) {
- logWarning(
- s"""Found at least one malformed records (sample: $record). The JSON reader will replace
- |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
- |To find out which corrupted records have been replaced with null, please use the
- |default inferred schema instead of providing a custom schema.
- |
- |Code example to print all malformed records (scala):
- |===================================================
- |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
- |val parsedJson = spark.read.json("/path/to/json/file/test.json")
- |
- """.stripMargin)
- isWarningPrintedForMalformedRecord = true
- }
- emptyRow
- } else {
- val row = new GenericInternalRow(schema.length)
- for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
- require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String.fromString(record))
- }
- Seq(row)
+ private def failedRecord(record: () => UTF8String): Seq[InternalRow] = {
+ corruptFieldIndex match {
+ case _ if options.failFast =>
+ if (options.wholeFile) {
+ throw new SparkSQLJsonProcessingException("Malformed line in FAILFAST mode")
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: ${record()}")
+ }
+
+ case _ if options.dropMalformed =>
+ if (!isWarningPrinted) {
+ printWarningForMalformedRecord(record)
+ isWarningPrinted = true
+ }
+ Nil
+
+ case None =>
+ if (!isWarningPrinted) {
+ printWarningForMalformedRecord(record)
+ isWarningPrinted = true
+ }
+ emptyRow
+
+ case Some(corruptIndex) =>
+ if (!isWarningPrinted) {
+ printWarningIfWholeFile()
+ isWarningPrinted = true
+ }
+ val row = new GenericInternalRow(schema.length)
+ row.update(corruptIndex, record())
+ Seq(row)
}
}
@@ -119,11 +159,11 @@ class JacksonParser(
* to a value according to a desired schema. This is a wrapper for the method
* `makeConverter()` to handle a row wrapped with an array.
*/
- private def makeRootConverter(st: StructType): ValueConverter = {
+ private def makeRootConverter(st: StructType): JsonParser => Seq[InternalRow] = {
val elementConverter = makeConverter(st)
- val fieldConverters = st.map(_.dataType).map(makeConverter)
- (parser: JsonParser) => parseJsonToken(parser, st) {
- case START_OBJECT => convertObject(parser, st, fieldConverters)
+ val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
+ (parser: JsonParser) => parseJsonToken[Seq[InternalRow]](parser, st) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters) :: Nil
// SPARK-3308: support reading top level JSON arrays and take every element
// in such an array as a row
//
@@ -137,7 +177,15 @@ class JacksonParser(
// List([str_a_1,null])
// List([str_a_2,null], [null,str_b_3])
//
- case START_ARRAY => convertArray(parser, elementConverter)
+ case START_ARRAY =>
+ val array = convertArray(parser, elementConverter)
+ // Here, as we support reading top level JSON arrays and take every element
+ // in such an array as a row, this case is possible.
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema).toSeq
+ }
}
}
@@ -145,35 +193,35 @@ class JacksonParser(
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema.
*/
- private[sql] def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ def makeConverter(dataType: DataType): ValueConverter = dataType match {
case BooleanType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Boolean](parser, dataType) {
case VALUE_TRUE => true
case VALUE_FALSE => false
}
case ByteType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Byte](parser, dataType) {
case VALUE_NUMBER_INT => parser.getByteValue
}
case ShortType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Short](parser, dataType) {
case VALUE_NUMBER_INT => parser.getShortValue
}
case IntegerType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_NUMBER_INT => parser.getIntValue
}
case LongType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_NUMBER_INT => parser.getLongValue
}
case FloatType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Float](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getFloatValue
@@ -193,7 +241,7 @@ class JacksonParser(
}
case DoubleType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Double](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getDoubleValue
@@ -213,7 +261,7 @@ class JacksonParser(
}
case StringType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[UTF8String](parser, dataType) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)
@@ -227,66 +275,71 @@ class JacksonParser(
}
case TimestampType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
+ val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
- Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
- .getOrElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
- }
+ Long.box {
+ Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(stringValue).getTime * 1000L
+ }
+ }
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
}
case DateType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
- Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
- .getOrElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+ Int.box {
+ Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
+ .orElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
+ }
.getOrElse {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
}
}
case BinaryType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[Array[Byte]](parser, dataType) {
case VALUE_STRING => parser.getBinaryValue
}
case dt: DecimalType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) {
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
}
case st: StructType =>
- val fieldConverters = st.map(_.dataType).map(makeConverter)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
+ (parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) {
case START_OBJECT => convertObject(parser, st, fieldConverters)
}
case at: ArrayType =>
val elementConverter = makeConverter(at.elementType)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[ArrayData](parser, dataType) {
case START_ARRAY => convertArray(parser, elementConverter)
}
case mt: MapType =>
val valueConverter = makeConverter(mt.valueType)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ (parser: JsonParser) => parseJsonToken[MapData](parser, dataType) {
case START_OBJECT => convertMap(parser, valueConverter)
}
@@ -298,7 +351,7 @@ class JacksonParser(
// Here, we pass empty `PartialFunction` so that this case can be
// handled as a failed conversion. It will throw an exception as
// long as the value is not null.
- parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
+ parseJsonToken[AnyRef](parser, dataType)(PartialFunction.empty[JsonToken, AnyRef])
}
/**
@@ -306,14 +359,14 @@ class JacksonParser(
* to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
* token, call `failedConversion` to handle the token.
*/
- private def parseJsonToken(
+ private def parseJsonToken[R >: Null](
parser: JsonParser,
- dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
+ dataType: DataType)(f: PartialFunction[JsonToken, R]): R = {
parser.getCurrentToken match {
case FIELD_NAME =>
// There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
parser.nextToken()
- parseJsonToken(parser, dataType)(f)
+ parseJsonToken[R](parser, dataType)(f)
case null | VALUE_NULL => null
@@ -325,9 +378,9 @@ class JacksonParser(
* This function throws an exception for failed conversion, but returns null for empty string,
* to guard the non string types.
*/
- private def failedConversion(
+ private def failedConversion[R >: Null](
parser: JsonParser,
- dataType: DataType): PartialFunction[JsonToken, Any] = {
+ dataType: DataType): PartialFunction[JsonToken, R] = {
case VALUE_STRING if parser.getTextLength < 1 =>
// If conversion is failed, this produces `null` rather than throwing exception.
// This will protect the mismatch of types.
@@ -348,7 +401,7 @@ class JacksonParser(
private def convertObject(
parser: JsonParser,
schema: StructType,
- fieldConverters: Seq[ValueConverter]): InternalRow = {
+ fieldConverters: Array[ValueConverter]): InternalRow = {
val row = new GenericInternalRow(schema.length)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
@@ -394,36 +447,30 @@ class JacksonParser(
}
/**
- * Parse the string JSON input to the set of [[InternalRow]]s.
+ * Parse the JSON input to the set of [[InternalRow]]s.
+ *
+ * @param recordLiteral an optional function that will be used to generate
+ * the corrupt record text instead of record.toString
*/
- def parse(input: String): Seq[InternalRow] = {
- if (input.trim.isEmpty) {
- Nil
- } else {
- try {
- Utils.tryWithResource(factory.createParser(input)) { parser =>
- parser.nextToken()
- rootConverter.apply(parser) match {
- case null => failedRecord(input)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- // Here, as we support reading top level JSON arrays and take every element
- // in such an array as a row, this case is possible.
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- failedRecord(input)
+ def parse[T](
+ record: T,
+ createParser: (JsonFactory, T) => JsonParser,
+ recordLiteral: T => UTF8String): Seq[InternalRow] = {
+ try {
+ Utils.tryWithResource(createParser(factory, record)) { parser =>
+ // a null first token is equivalent to testing for input.trim.isEmpty
+ // but it works on any token stream and not just strings
+ parser.nextToken() match {
+ case null => Nil
+ case _ => rootConverter.apply(parser) match {
+ case null => throw new SparkSQLJsonProcessingException("Root converter returned null")
+ case rows => rows
}
}
- } catch {
- case _: JsonProcessingException =>
- failedRecord(input)
- case _: SparkSQLJsonProcessingException =>
- failedRecord(input)
}
+ } catch {
+ case _: JsonProcessingException | _: SparkSQLJsonProcessingException =>
+ failedRecord(() => recordLiteral(record))
}
}
}