diff options
Diffstat (limited to 'sql/core/src')
16 files changed, 102 insertions, 675 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b10d2c86ac..b84fb2fb95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -21,14 +21,15 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.spark.Partition import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging +import org.apache.spark.Partition import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.json.InferSchema import org.apache.spark.sql.types.StructType /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala deleted file mode 100644 index 41cff07472..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec} - -import org.apache.spark.util.Utils - -private[datasources] object CompressionCodecs { - private val shortCompressionCodecNames = Map( - "none" -> null, - "uncompressed" -> null, - "bzip2" -> classOf[BZip2Codec].getName, - "deflate" -> classOf[DeflateCodec].getName, - "gzip" -> classOf[GzipCodec].getName, - "lz4" -> classOf[Lz4Codec].getName, - "snappy" -> classOf[SnappyCodec].getName) - - /** - * Return the full version of the given codec class. - * If it is already a class name, just return it. - */ - def getCodecClassName(name: String): String = { - val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name) - try { - // Validate the codec name - if (codecName != null) { - Utils.classForName(codecName) - } - codecName - } catch { - case e: ClassNotFoundException => - throw new IllegalArgumentException(s"Codec [$codecName] " + - s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.") - } - } - - /** - * Set compression configurations to Hadoop `Configuration`. - * `codec` should be a full class path - */ - def setCodecConfiguration(conf: Configuration, codec: String): Unit = { - if (codec != null) { - conf.set("mapreduce.output.fileoutputformat.compress", "true") - conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) - conf.set("mapreduce.output.fileoutputformat.compress.codec", codec) - conf.set("mapreduce.map.output.compress", "true") - conf.set("mapreduce.map.output.compress.codec", codec) - } else { - // This infers the option `compression` is set to `uncompressed` or `none`. - conf.set("mapreduce.output.fileoutputformat.compress", "false") - conf.set("mapreduce.map.output.compress", "false") - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala deleted file mode 100644 index 468228053c..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources - -private[datasources] object ParseModes { - val PERMISSIVE_MODE = "PERMISSIVE" - val DROP_MALFORMED_MODE = "DROPMALFORMED" - val FAIL_FAST_MODE = "FAILFAST" - - val DEFAULT = PERMISSIVE_MODE - - def isValidMode(mode: String): Boolean = { - mode.toUpperCase match { - case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true - case _ => false - } - } - - def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE - def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE - def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) { - mode.toUpperCase == PERMISSIVE_MODE - } else { - true // We default to permissive is the mode string is not valid - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 9610746a81..4e662a52a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -29,6 +29,7 @@ import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index e7dcc22272..014614eb99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} +import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes} private[csv] class CSVOptions(@transient private val parameters: Map[String, String]) extends Logging with Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 91c58d059d..dc8bd817f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion -import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil +import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil +import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.types._ import org.apache.spark.util.Utils diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala deleted file mode 100644 index 02d211d042..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.json - -import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import org.apache.commons.lang3.time.FastDateFormat - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} - -/** - * Options for the JSON data source. - * - * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. - */ -private[sql] class JSONOptions( - @transient private val parameters: Map[String, String]) - extends Logging with Serializable { - - val samplingRatio = - parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - val primitivesAsString = - parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false) - val prefersDecimal = - parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false) - val allowComments = - parameters.get("allowComments").map(_.toBoolean).getOrElse(false) - val allowUnquotedFieldNames = - parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false) - val allowSingleQuotes = - parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true) - val allowNumericLeadingZeros = - parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false) - val allowNonNumericNumbers = - parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true) - val allowBackslashEscapingAnyCharacter = - parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) - val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) - private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") - val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") - - // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. - val dateFormat: FastDateFormat = - FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd")) - - val timestampFormat: FastDateFormat = - FastDateFormat.getInstance( - parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ")) - - // Parse mode flags - if (!ParseModes.isValidMode(parseMode)) { - logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.") - } - - val failFast = ParseModes.isFailFastMode(parseMode) - val dropMalformed = ParseModes.isDropMalformedMode(parseMode) - val permissive = ParseModes.isPermissiveMode(parseMode) - - /** Sets config options on a Jackson [[JsonFactory]]. */ - def setJacksonOptions(factory: JsonFactory): Unit = { - factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments) - factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames) - factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes) - factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros) - factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers) - factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, - allowBackslashEscapingAnyCharacter) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala index 270e7fbd3c..5b55b70186 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala @@ -21,8 +21,9 @@ import java.io.Writer import com.fasterxml.jackson.core._ -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala deleted file mode 100644 index 5ce1bf7432..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.json - -import java.io.ByteArrayOutputStream - -import scala.collection.mutable.ArrayBuffer -import scala.util.Try - -import com.fasterxml.jackson.core._ - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE} -import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil -import org.apache.spark.sql.types._ -import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.Utils - -private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg) - -class JacksonParser( - schema: StructType, - columnNameOfCorruptRecord: String, - options: JSONOptions) extends Logging { - - import com.fasterxml.jackson.core.JsonToken._ - - // A `ValueConverter` is responsible for converting a value from `JsonParser` - // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any - - // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) - - private val factory = new JsonFactory() - options.setJacksonOptions(factory) - - private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) - - @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false - - /** - * This function deals with the cases it fails to parse. This function will be called - * when exceptions are caught during converting. This functions also deals with `mode` option. - */ - private def failedRecord(record: String): Seq[InternalRow] = { - // create a row even if no corrupt record column is present - if (options.failFast) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: $record") - } - if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { - logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=================================================== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) - isWarningPrintedForMalformedRecord = true - } - Nil - } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) { - if (!isWarningPrintedForMalformedRecord) { - logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will replace - |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. - |To find out which corrupted records have been replaced with null, please use the - |default inferred schema instead of providing a custom schema. - | - |Code example to print all malformed records (scala): - |=================================================== - |// The corrupted record exists in column ${columnNameOfCorruptRecord}. - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) - isWarningPrintedForMalformedRecord = true - } - emptyRow - } else { - val row = new GenericMutableRow(schema.length) - for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) { - require(schema(corruptIndex).dataType == StringType) - row.update(corruptIndex, UTF8String.fromString(record)) - } - Seq(row) - } - } - - /** - * Create a converter which converts the JSON documents held by the `JsonParser` - * to a value according to a desired schema. This is a wrapper for the method - * `makeConverter()` to handle a row wrapped with an array. - */ - def makeRootConverter(dataType: DataType): ValueConverter = dataType match { - case st: StructType => - val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case START_OBJECT => convertObject(parser, st, fieldConverters) - // SPARK-3308: support reading top level JSON arrays and take every element - // in such an array as a row - // - // For example, we support, the JSON data as below: - // - // [{"a":"str_a_1"}] - // [{"a":"str_a_2"}, {"b":"str_b_3"}] - // - // resulting in: - // - // List([str_a_1,null]) - // List([str_a_2,null], [null,str_b_3]) - // - case START_ARRAY => convertArray(parser, elementConverter) - } - - case ArrayType(st: StructType, _) => - val elementConverter = makeConverter(st) - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { - // the business end of SPARK-3308: - // when an object is found but an array is requested just wrap it in a list. - // This is being wrapped in `JacksonParser.parse`. - case START_OBJECT => convertObject(parser, st, fieldConverters) - case START_ARRAY => convertArray(parser, elementConverter) - } - - case _ => makeConverter(dataType) - } - - /** - * Create a converter which converts the JSON documents held by the `JsonParser` - * to a value according to a desired schema. - */ - private def makeConverter(dataType: DataType): ValueConverter = dataType match { - case BooleanType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_TRUE => true - case VALUE_FALSE => false - } - - case ByteType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT => parser.getByteValue - } - - case ShortType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT => parser.getShortValue - } - - case IntegerType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT => parser.getIntValue - } - - case LongType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT => parser.getLongValue - } - - case FloatType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getFloatValue - - case VALUE_STRING => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toFloat - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.") - } - } - - case DoubleType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT => - parser.getDoubleValue - - case VALUE_STRING => - // Special case handling for NaN and Infinity. - val value = parser.getText - val lowerCaseValue = value.toLowerCase - if (lowerCaseValue.equals("nan") || - lowerCaseValue.equals("infinity") || - lowerCaseValue.equals("-infinity") || - lowerCaseValue.equals("inf") || - lowerCaseValue.equals("-inf")) { - value.toDouble - } else { - throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.") - } - } - - case StringType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - UTF8String.fromString(parser.getText) - - case _ => - // Note that it always tries to convert the data as string without the case of failure. - val writer = new ByteArrayOutputStream() - Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { - generator => generator.copyCurrentStructure(parser) - } - UTF8String.fromBytes(writer.toByteArray) - } - - case TimestampType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(parser.getText).getTime * 1000L) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L - } - - case VALUE_NUMBER_INT => - parser.getLongValue * 1000000L - } - - case DateType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => - val stringValue = parser.getText - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681.x - Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime)) - .getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)) - .getOrElse { - // In Spark 1.5.0, we store the data as number of days since epoch in string. - // So, we just convert it to Int. - stringValue.toInt - } - } - } - - case BinaryType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case VALUE_STRING => parser.getBinaryValue - } - - case dt: DecimalType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) => - Decimal(parser.getDecimalValue, dt.precision, dt.scale) - } - - case st: StructType => - val fieldConverters = st.map(_.dataType).map(makeConverter) - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case START_OBJECT => convertObject(parser, st, fieldConverters) - } - - case at: ArrayType => - val elementConverter = makeConverter(at.elementType) - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case START_ARRAY => convertArray(parser, elementConverter) - } - - case mt: MapType => - val valueConverter = makeConverter(mt.valueType) - (parser: JsonParser) => parseJsonToken(parser, dataType) { - case START_OBJECT => convertMap(parser, valueConverter) - } - - case udt: UserDefinedType[_] => - makeConverter(udt.sqlType) - - case _ => - (parser: JsonParser) => - // Here, we pass empty `PartialFunction` so that this case can be - // handled as a failed conversion. It will throw an exception as - // long as the value is not null. - parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any]) - } - - /** - * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying - * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the - * token, call `failedConversion` to handle the token. - */ - private def parseJsonToken( - parser: JsonParser, - dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = { - parser.getCurrentToken match { - case FIELD_NAME => - // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens - parser.nextToken() - parseJsonToken(parser, dataType)(f) - - case null | VALUE_NULL => null - - case other => f.applyOrElse(other, failedConversion(parser, dataType)) - } - } - - /** - * This function throws an exception for failed conversion, but returns null for empty string, - * to guard the non string types. - */ - private def failedConversion( - parser: JsonParser, - dataType: DataType): PartialFunction[JsonToken, Any] = { - case VALUE_STRING if parser.getTextLength < 1 => - // If conversion is failed, this produces `null` rather than throwing exception. - // This will protect the mismatch of types. - null - - case token => - // We cannot parse this token based on the given data type. So, we throw a - // SparkSQLJsonProcessingException and this exception will be caught by - // `parse` method. - throw new SparkSQLJsonProcessingException( - s"Failed to parse a value for data type $dataType (current token: $token).") - } - - /** - * Parse an object from the token stream into a new Row representing the schema. - * Fields in the json that are not defined in the requested schema will be dropped. - */ - private def convertObject( - parser: JsonParser, - schema: StructType, - fieldConverters: Seq[ValueConverter]): InternalRow = { - val row = new GenericMutableRow(schema.length) - while (nextUntil(parser, JsonToken.END_OBJECT)) { - schema.getFieldIndex(parser.getCurrentName) match { - case Some(index) => - row.update(index, fieldConverters(index).apply(parser)) - - case None => - parser.skipChildren() - } - } - - row - } - - /** - * Parse an object as a Map, preserving all fields. - */ - private def convertMap( - parser: JsonParser, - fieldConverter: ValueConverter): MapData = { - val keys = ArrayBuffer.empty[UTF8String] - val values = ArrayBuffer.empty[Any] - while (nextUntil(parser, JsonToken.END_OBJECT)) { - keys += UTF8String.fromString(parser.getCurrentName) - values += fieldConverter.apply(parser) - } - - ArrayBasedMapData(keys.toArray, values.toArray) - } - - /** - * Parse an object as a Array. - */ - private def convertArray( - parser: JsonParser, - fieldConverter: ValueConverter): ArrayData = { - val values = ArrayBuffer.empty[Any] - while (nextUntil(parser, JsonToken.END_ARRAY)) { - values += fieldConverter.apply(parser) - } - - new GenericArrayData(values.toArray) - } - - /** - * Parse the string JSON input to the set of [[InternalRow]]s. - */ - def parse(input: String): Seq[InternalRow] = { - if (input.trim.isEmpty) { - Nil - } else { - try { - Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { - case null => failedRecord(input) - case row: InternalRow => row :: Nil - case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { - Nil - } else { - array.toArray[InternalRow](schema) - } - case _ => - failedRecord(input) - } - } - } catch { - case _: JsonProcessingException => - failedRecord(input) - case _: SparkSQLJsonProcessingException => - failedRecord(input) - } - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala deleted file mode 100644 index 005546f37d..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.json - -import com.fasterxml.jackson.core.{JsonParser, JsonToken} - -private object JacksonUtils { - /** - * Advance the parser until a null or a specific token is found - */ - def nextUntil(parser: JsonParser, stopOn: JsonToken): Boolean = { - parser.nextToken() match { - case null => false - case x => x != stopOn - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 6882a6cdca..9fe38ccc9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala index a875b01ec2..9f96667311 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructType} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 47bf41a2da..3bc1c5b900 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.runtime.universe.{typeTag, TypeTag} import scala.util.Try @@ -2819,6 +2820,63 @@ object functions { } /** + * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the + * specified schema. Returns `null`, in the case of an unparseable string. + * + * @param schema the schema to use when parsing the json string + * @param options options to control how the json is parsed. accepts the same options and the + * json data source. + * @param e a string column containing JSON data. + * + * @group collection_funcs + * @since 2.1.0 + */ + def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr { + JsonToStruct(schema, options, e.expr) + } + + /** + * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the + * specified schema. Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string + * @param options options to control how the json is parsed. accepts the same options and the + * json data source. + * + * @group collection_funcs + * @since 2.1.0 + */ + def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column = + from_json(e, schema, options.asScala.toMap) + + /** + * Parses a column containing a JSON string into a [[StructType]] with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string + * + * @group collection_funcs + * @since 2.1.0 + */ + def from_json(e: Column, schema: StructType): Column = + from_json(e, schema, Map.empty[String, String]) + + /** + * Parses a column containing a JSON string into a [[StructType]] with the specified schema. + * Returns `null`, in the case of an unparseable string. + * + * @param e a string column containing JSON data. + * @param schema the schema to use when parsing the json string as a json string + * + * @group collection_funcs + * @since 2.1.0 + */ + def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = + from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options) + + /** * Returns length of array or map. * * @group collection_funcs diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 1391c9d57f..518d6e92b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql +import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.{IntegerType, StructType} class JsonFunctionsSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(expr, expected) } + + test("json_parser") { + val df = Seq("""{"a": 1}""").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_json($"value", schema)), + Row(Row(1)) :: Nil) + } + + test("json_parser missing columns") { + val df = Seq("""{"a": 1}""").toDS() + val schema = new StructType().add("b", IntegerType) + + checkAnswer( + df.select(from_json($"value", schema)), + Row(Row(null)) :: Nil) + } + + test("json_parser invalid json") { + val df = Seq("""{"a" 1}""").toDS() + val schema = new StructType().add("a", IntegerType) + + checkAnswer( + df.select(from_json($"value", schema)), + Row(null) :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala index c31dffedbd..0b72da5f37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.json import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.test.SharedSQLContext /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3d533c14e1..456052f79a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.SparkException import org.apache.spark.rdd.RDD +import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType |