aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/scala/org/apache
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-09-29 13:01:10 -0700
committerYin Huai <yhuai@databricks.com>2016-09-29 13:01:10 -0700
commitfe33121a53384811a8e094ab6c05dc85b7c7ca87 (patch)
treed0575a3d0eefe46ea4b8e200e70d0834b566b477 /sql/catalyst/src/main/scala/org/apache
parent027dea8f294504bc5cd8bfedde546d171cb78657 (diff)
downloadspark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.gz
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.tar.bz2
spark-fe33121a53384811a8e094ab6c05dc85b7c7ca87.zip
[SPARK-17699] Support for parsing JSON string columns
Spark SQL has great support for reading text files that contain JSON data. However, in many cases the JSON data is just one column amongst others. This is particularly true when reading from sources such as Kafka. This PR adds a new functions `from_json` that converts a string column into a nested `StructType` with a user specified schema. Example usage: ```scala val df = Seq("""{"a": 1}""").toDS() val schema = new StructType().add("a", IntegerType) df.select(from_json($"value", schema) as 'json) // => [json: <a: int>] ``` This PR adds support for java, scala and python. I leveraged our existing JSON parsing support by moving it into catalyst (so that we could define expressions using it). I left SQL out for now, because I'm not sure how users would specify a schema. Author: Michael Armbrust <michael@databricks.com> Closes #15274 from marmbrus/jsonParser.
Diffstat (limited to 'sql/catalyst/src/main/scala/org/apache')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala31
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala84
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala443
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala32
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala72
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala41
6 files changed, 701 insertions, 2 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index c14a2fb122..65dbd6a4e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
+import org.apache.spark.sql.catalyst.util.ParseModes
+import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
}
}
+/**
+ * Converts an json input string to a [[StructType]] with the specified schema.
+ */
+case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
+ extends Expression with CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+ @transient
+ lazy val parser =
+ new JacksonParser(
+ schema,
+ "invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
+ new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))
+
+ override def dataType: DataType = schema
+ override def children: Seq[Expression] = child :: Nil
+
+ override def eval(input: InternalRow): Any = {
+ try parser.parse(child.eval(input).toString).head catch {
+ case _: SparkSQLJsonProcessingException => null
+ }
+ }
+
+ override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
new file mode 100644
index 0000000000..aec18922ea
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.commons.lang3.time.FastDateFormat
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+
+/**
+ * Options for parsing JSON data into Spark SQL rows.
+ *
+ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
+ */
+private[sql] class JSONOptions(
+ @transient private val parameters: Map[String, String])
+ extends Logging with Serializable {
+
+ val samplingRatio =
+ parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
+ val primitivesAsString =
+ parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
+ val prefersDecimal =
+ parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
+ val allowComments =
+ parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
+ val allowUnquotedFieldNames =
+ parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
+ val allowSingleQuotes =
+ parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
+ val allowNumericLeadingZeros =
+ parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
+ val allowNonNumericNumbers =
+ parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
+ val allowBackslashEscapingAnyCharacter =
+ parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
+ val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
+ private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
+
+ // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
+ val dateFormat: FastDateFormat =
+ FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
+
+ val timestampFormat: FastDateFormat =
+ FastDateFormat.getInstance(
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
+
+ // Parse mode flags
+ if (!ParseModes.isValidMode(parseMode)) {
+ logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+ }
+
+ val failFast = ParseModes.isFailFastMode(parseMode)
+ val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+ val permissive = ParseModes.isPermissiveMode(parseMode)
+
+ /** Sets config options on a Jackson [[JsonFactory]]. */
+ def setJacksonOptions(factory: JsonFactory): Unit = {
+ factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
+ factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
+ factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
+ factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
+ factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
+ factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
+ allowBackslashEscapingAnyCharacter)
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
new file mode 100644
index 0000000000..f80e6373d2
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.ByteArrayOutputStream
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import com.fasterxml.jackson.core._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+
+/**
+ * Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
+ */
+class JacksonParser(
+ schema: StructType,
+ columnNameOfCorruptRecord: String,
+ options: JSONOptions) extends Logging {
+
+ import JacksonUtils._
+ import ParseModes._
+ import com.fasterxml.jackson.core.JsonToken._
+
+ // A `ValueConverter` is responsible for converting a value from `JsonParser`
+ // to a value in a field for `InternalRow`.
+ private type ValueConverter = (JsonParser) => Any
+
+ // `ValueConverter`s for the root schema for all fields in the schema
+ private val rootConverter: ValueConverter = makeRootConverter(schema)
+
+ private val factory = new JsonFactory()
+ options.setJacksonOptions(factory)
+
+ private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
+
+ @transient
+ private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+
+ /**
+ * This function deals with the cases it fails to parse. This function will be called
+ * when exceptions are caught during converting. This functions also deals with `mode` option.
+ */
+ private def failedRecord(record: String): Seq[InternalRow] = {
+ // create a row even if no corrupt record column is present
+ if (options.failFast) {
+ throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
+ }
+ if (options.dropMalformed) {
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will drop
+ |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
+ |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
+ |mode and use the default inferred schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
+ Nil
+ } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
+ if (!isWarningPrintedForMalformedRecord) {
+ logWarning(
+ s"""Found at least one malformed records (sample: $record). The JSON reader will replace
+ |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
+ |To find out which corrupted records have been replaced with null, please use the
+ |default inferred schema instead of providing a custom schema.
+ |
+ |Code example to print all malformed records (scala):
+ |===================================================
+ |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+ """.stripMargin)
+ isWarningPrintedForMalformedRecord = true
+ }
+ emptyRow
+ } else {
+ val row = new GenericMutableRow(schema.length)
+ for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
+ require(schema(corruptIndex).dataType == StringType)
+ row.update(corruptIndex, UTF8String.fromString(record))
+ }
+ Seq(row)
+ }
+ }
+
+ /**
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema. This is a wrapper for the method
+ * `makeConverter()` to handle a row wrapped with an array.
+ */
+ def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
+ case st: StructType =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ // SPARK-3308: support reading top level JSON arrays and take every element
+ // in such an array as a row
+ //
+ // For example, we support, the JSON data as below:
+ //
+ // [{"a":"str_a_1"}]
+ // [{"a":"str_a_2"}, {"b":"str_b_3"}]
+ //
+ // resulting in:
+ //
+ // List([str_a_1,null])
+ // List([str_a_2,null], [null,str_b_3])
+ //
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
+
+ case ArrayType(st: StructType, _) =>
+ val elementConverter = makeConverter(st)
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ // the business end of SPARK-3308:
+ // when an object is found but an array is requested just wrap it in a list.
+ // This is being wrapped in `JacksonParser.parse`.
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
+
+ case _ => makeConverter(dataType)
+ }
+
+ /**
+ * Create a converter which converts the JSON documents held by the `JsonParser`
+ * to a value according to a desired schema.
+ */
+ private def makeConverter(dataType: DataType): ValueConverter = dataType match {
+ case BooleanType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_TRUE => true
+ case VALUE_FALSE => false
+ }
+
+ case ByteType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getByteValue
+ }
+
+ case ShortType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getShortValue
+ }
+
+ case IntegerType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getIntValue
+ }
+
+ case LongType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT => parser.getLongValue
+ }
+
+ case FloatType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getFloatValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toFloat
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
+ }
+ }
+
+ case DoubleType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
+ parser.getDoubleValue
+
+ case VALUE_STRING =>
+ // Special case handling for NaN and Infinity.
+ val value = parser.getText
+ val lowerCaseValue = value.toLowerCase
+ if (lowerCaseValue.equals("nan") ||
+ lowerCaseValue.equals("infinity") ||
+ lowerCaseValue.equals("-infinity") ||
+ lowerCaseValue.equals("inf") ||
+ lowerCaseValue.equals("-inf")) {
+ value.toDouble
+ } else {
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
+ }
+ }
+
+ case StringType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ UTF8String.fromString(parser.getText)
+
+ case _ =>
+ // Note that it always tries to convert the data as string without the case of failure.
+ val writer = new ByteArrayOutputStream()
+ Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
+ generator => generator.copyCurrentStructure(parser)
+ }
+ UTF8String.fromBytes(writer.toByteArray)
+ }
+
+ case TimestampType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.
+ Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
+ }
+
+ case VALUE_NUMBER_INT =>
+ parser.getLongValue * 1000000L
+ }
+
+ case DateType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING =>
+ val stringValue = parser.getText
+ // This one will lose microseconds parts.
+ // See https://issues.apache.org/jira/browse/SPARK-10681.x
+ Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
+ .getOrElse {
+ // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
+ // compatibility.
+ Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
+ .getOrElse {
+ // In Spark 1.5.0, we store the data as number of days since epoch in string.
+ // So, we just convert it to Int.
+ stringValue.toInt
+ }
+ }
+ }
+
+ case BinaryType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case VALUE_STRING => parser.getBinaryValue
+ }
+
+ case dt: DecimalType =>
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
+ Decimal(parser.getDecimalValue, dt.precision, dt.scale)
+ }
+
+ case st: StructType =>
+ val fieldConverters = st.map(_.dataType).map(makeConverter)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertObject(parser, st, fieldConverters)
+ }
+
+ case at: ArrayType =>
+ val elementConverter = makeConverter(at.elementType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_ARRAY => convertArray(parser, elementConverter)
+ }
+
+ case mt: MapType =>
+ val valueConverter = makeConverter(mt.valueType)
+ (parser: JsonParser) => parseJsonToken(parser, dataType) {
+ case START_OBJECT => convertMap(parser, valueConverter)
+ }
+
+ case udt: UserDefinedType[_] =>
+ makeConverter(udt.sqlType)
+
+ case _ =>
+ (parser: JsonParser) =>
+ // Here, we pass empty `PartialFunction` so that this case can be
+ // handled as a failed conversion. It will throw an exception as
+ // long as the value is not null.
+ parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
+ }
+
+ /**
+ * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
+ * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
+ * token, call `failedConversion` to handle the token.
+ */
+ private def parseJsonToken(
+ parser: JsonParser,
+ dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
+ parser.getCurrentToken match {
+ case FIELD_NAME =>
+ // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
+ parser.nextToken()
+ parseJsonToken(parser, dataType)(f)
+
+ case null | VALUE_NULL => null
+
+ case other => f.applyOrElse(other, failedConversion(parser, dataType))
+ }
+ }
+
+ /**
+ * This function throws an exception for failed conversion, but returns null for empty string,
+ * to guard the non string types.
+ */
+ private def failedConversion(
+ parser: JsonParser,
+ dataType: DataType): PartialFunction[JsonToken, Any] = {
+ case VALUE_STRING if parser.getTextLength < 1 =>
+ // If conversion is failed, this produces `null` rather than throwing exception.
+ // This will protect the mismatch of types.
+ null
+
+ case token =>
+ // We cannot parse this token based on the given data type. So, we throw a
+ // SparkSQLJsonProcessingException and this exception will be caught by
+ // `parse` method.
+ throw new SparkSQLJsonProcessingException(
+ s"Failed to parse a value for data type $dataType (current token: $token).")
+ }
+
+ /**
+ * Parse an object from the token stream into a new Row representing the schema.
+ * Fields in the json that are not defined in the requested schema will be dropped.
+ */
+ private def convertObject(
+ parser: JsonParser,
+ schema: StructType,
+ fieldConverters: Seq[ValueConverter]): InternalRow = {
+ val row = new GenericMutableRow(schema.length)
+ while (nextUntil(parser, JsonToken.END_OBJECT)) {
+ schema.getFieldIndex(parser.getCurrentName) match {
+ case Some(index) =>
+ row.update(index, fieldConverters(index).apply(parser))
+
+ case None =>
+ parser.skipChildren()
+ }
+ }
+
+ row
+ }
+
+ /**
+ * Parse an object as a Map, preserving all fields.
+ */
+ private def convertMap(
+ parser: JsonParser,
+ fieldConverter: ValueConverter): MapData = {
+ val keys = ArrayBuffer.empty[UTF8String]
+ val values = ArrayBuffer.empty[Any]
+ while (nextUntil(parser, JsonToken.END_OBJECT)) {
+ keys += UTF8String.fromString(parser.getCurrentName)
+ values += fieldConverter.apply(parser)
+ }
+
+ ArrayBasedMapData(keys.toArray, values.toArray)
+ }
+
+ /**
+ * Parse an object as a Array.
+ */
+ private def convertArray(
+ parser: JsonParser,
+ fieldConverter: ValueConverter): ArrayData = {
+ val values = ArrayBuffer.empty[Any]
+ while (nextUntil(parser, JsonToken.END_ARRAY)) {
+ values += fieldConverter.apply(parser)
+ }
+
+ new GenericArrayData(values.toArray)
+ }
+
+ /**
+ * Parse the string JSON input to the set of [[InternalRow]]s.
+ */
+ def parse(input: String): Seq[InternalRow] = {
+ if (input.trim.isEmpty) {
+ Nil
+ } else {
+ try {
+ Utils.tryWithResource(factory.createParser(input)) { parser =>
+ parser.nextToken()
+ rootConverter.apply(parser) match {
+ case null => failedRecord(input)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ // Here, as we support reading top level JSON arrays and take every element
+ // in such an array as a row, this case is possible.
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ failedRecord(input)
+ }
+ }
+ } catch {
+ case _: JsonProcessingException =>
+ failedRecord(input)
+ case _: SparkSQLJsonProcessingException =>
+ failedRecord(input)
+ }
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
new file mode 100644
index 0000000000..c4d9abb2c0
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import com.fasterxml.jackson.core.{JsonParser, JsonToken}
+
+object JacksonUtils {
+ /**
+ * Advance the parser until a null or a specific token is found
+ */
+ def nextUntil(parser: JsonParser, stopOn: JsonToken): Boolean = {
+ parser.nextToken() match {
+ case null => false
+ case x => x != stopOn
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
new file mode 100644
index 0000000000..435fba9d88
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CompressionCodecs.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.SequenceFile.CompressionType
+import org.apache.hadoop.io.compress._
+
+import org.apache.spark.util.Utils
+
+object CompressionCodecs {
+ private val shortCompressionCodecNames = Map(
+ "none" -> null,
+ "uncompressed" -> null,
+ "bzip2" -> classOf[BZip2Codec].getName,
+ "deflate" -> classOf[DeflateCodec].getName,
+ "gzip" -> classOf[GzipCodec].getName,
+ "lz4" -> classOf[Lz4Codec].getName,
+ "snappy" -> classOf[SnappyCodec].getName)
+
+ /**
+ * Return the full version of the given codec class.
+ * If it is already a class name, just return it.
+ */
+ def getCodecClassName(name: String): String = {
+ val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
+ try {
+ // Validate the codec name
+ if (codecName != null) {
+ Utils.classForName(codecName)
+ }
+ codecName
+ } catch {
+ case e: ClassNotFoundException =>
+ throw new IllegalArgumentException(s"Codec [$codecName] " +
+ s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
+ }
+ }
+
+ /**
+ * Set compression configurations to Hadoop `Configuration`.
+ * `codec` should be a full class path
+ */
+ def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
+ if (codec != null) {
+ conf.set("mapreduce.output.fileoutputformat.compress", "true")
+ conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
+ conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
+ conf.set("mapreduce.map.output.compress", "true")
+ conf.set("mapreduce.map.output.compress.codec", codec)
+ } else {
+ // This infers the option `compression` is set to `uncompressed` or `none`.
+ conf.set("mapreduce.output.fileoutputformat.compress", "false")
+ conf.set("mapreduce.map.output.compress", "false")
+ }
+ }
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
new file mode 100644
index 0000000000..0e466962b4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ParseModes.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+object ParseModes {
+ val PERMISSIVE_MODE = "PERMISSIVE"
+ val DROP_MALFORMED_MODE = "DROPMALFORMED"
+ val FAIL_FAST_MODE = "FAILFAST"
+
+ val DEFAULT = PERMISSIVE_MODE
+
+ def isValidMode(mode: String): Boolean = {
+ mode.toUpperCase match {
+ case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
+ case _ => false
+ }
+ }
+
+ def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
+ def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
+ def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
+ mode.toUpperCase == PERMISSIVE_MODE
+ } else {
+ true // We default to permissive is the mode string is not valid
+ }
+}