aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala72
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala41
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala84
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala440
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala58
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala29
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala3
16 files changed, 102 insertions, 675 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index b10d2c86ac..b84fb2fb95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -21,14 +21,15 @@ import java.util.Properties
import scala.collection.JavaConverters._
-import org.apache.spark.Partition
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
+import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
deleted file mode 100644
index 41cff07472..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CompressionCodecs.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
-
-import org.apache.spark.util.Utils
-
-private[datasources] object CompressionCodecs {
- private val shortCompressionCodecNames = Map(
- "none" -> null,
- "uncompressed" -> null,
- "bzip2" -> classOf[BZip2Codec].getName,
- "deflate" -> classOf[DeflateCodec].getName,
- "gzip" -> classOf[GzipCodec].getName,
- "lz4" -> classOf[Lz4Codec].getName,
- "snappy" -> classOf[SnappyCodec].getName)
-
- /**
- * Return the full version of the given codec class.
- * If it is already a class name, just return it.
- */
- def getCodecClassName(name: String): String = {
- val codecName = shortCompressionCodecNames.getOrElse(name.toLowerCase, name)
- try {
- // Validate the codec name
- if (codecName != null) {
- Utils.classForName(codecName)
- }
- codecName
- } catch {
- case e: ClassNotFoundException =>
- throw new IllegalArgumentException(s"Codec [$codecName] " +
- s"is not available. Known codecs are ${shortCompressionCodecNames.keys.mkString(", ")}.")
- }
- }
-
- /**
- * Set compression configurations to Hadoop `Configuration`.
- * `codec` should be a full class path
- */
- def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
- if (codec != null) {
- conf.set("mapreduce.output.fileoutputformat.compress", "true")
- conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
- conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
- conf.set("mapreduce.map.output.compress", "true")
- conf.set("mapreduce.map.output.compress.codec", codec)
- } else {
- // This infers the option `compression` is set to `uncompressed` or `none`.
- conf.set("mapreduce.output.fileoutputformat.compress", "false")
- conf.set("mapreduce.map.output.compress", "false")
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
deleted file mode 100644
index 468228053c..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ParseModes.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources
-
-private[datasources] object ParseModes {
- val PERMISSIVE_MODE = "PERMISSIVE"
- val DROP_MALFORMED_MODE = "DROPMALFORMED"
- val FAIL_FAST_MODE = "FAILFAST"
-
- val DEFAULT = PERMISSIVE_MODE
-
- def isValidMode(mode: String): Boolean = {
- mode.toUpperCase match {
- case PERMISSIVE_MODE | DROP_MALFORMED_MODE | FAIL_FAST_MODE => true
- case _ => false
- }
- }
-
- def isDropMalformedMode(mode: String): Boolean = mode.toUpperCase == DROP_MALFORMED_MODE
- def isFailFastMode(mode: String): Boolean = mode.toUpperCase == FAIL_FAST_MODE
- def isPermissiveMode(mode: String): Boolean = if (isValidMode(mode)) {
- mode.toUpperCase == PERMISSIVE_MODE
- } else {
- true // We default to permissive is the mode string is not valid
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 9610746a81..4e662a52a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,6 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index e7dcc22272..014614eb99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 91c58d059d..dc8bd817f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
deleted file mode 100644
index 02d211d042..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import org.apache.commons.lang3.time.FastDateFormat
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
-
-/**
- * Options for the JSON data source.
- *
- * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
- */
-private[sql] class JSONOptions(
- @transient private val parameters: Map[String, String])
- extends Logging with Serializable {
-
- val samplingRatio =
- parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- val primitivesAsString =
- parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
- val prefersDecimal =
- parameters.get("prefersDecimal").map(_.toBoolean).getOrElse(false)
- val allowComments =
- parameters.get("allowComments").map(_.toBoolean).getOrElse(false)
- val allowUnquotedFieldNames =
- parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false)
- val allowSingleQuotes =
- parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true)
- val allowNumericLeadingZeros =
- parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false)
- val allowNonNumericNumbers =
- parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
- val allowBackslashEscapingAnyCharacter =
- parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
- val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
- private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
- val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
-
- // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
- val dateFormat: FastDateFormat =
- FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"))
-
- val timestampFormat: FastDateFormat =
- FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"))
-
- // Parse mode flags
- if (!ParseModes.isValidMode(parseMode)) {
- logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
- }
-
- val failFast = ParseModes.isFailFastMode(parseMode)
- val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
- val permissive = ParseModes.isPermissiveMode(parseMode)
-
- /** Sets config options on a Jackson [[JsonFactory]]. */
- def setJacksonOptions(factory: JsonFactory): Unit = {
- factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
- factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
- factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
- factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
- factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
- factory.configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER,
- allowBackslashEscapingAnyCharacter)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index 270e7fbd3c..5b55b70186 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -21,8 +21,9 @@ import java.io.Writer
import com.fasterxml.jackson.core._
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
deleted file mode 100644
index 5ce1bf7432..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import java.io.ByteArrayOutputStream
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Try
-
-import com.fasterxml.jackson.core._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
-import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
-import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.Utils
-
-private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-
-class JacksonParser(
- schema: StructType,
- columnNameOfCorruptRecord: String,
- options: JSONOptions) extends Logging {
-
- import com.fasterxml.jackson.core.JsonToken._
-
- // A `ValueConverter` is responsible for converting a value from `JsonParser`
- // to a value in a field for `InternalRow`.
- private type ValueConverter = (JsonParser) => Any
-
- // `ValueConverter`s for the root schema for all fields in the schema
- private val rootConverter: ValueConverter = makeRootConverter(schema)
-
- private val factory = new JsonFactory()
- options.setJacksonOptions(factory)
-
- private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
-
- @transient
- private[this] var isWarningPrintedForMalformedRecord: Boolean = false
-
- /**
- * This function deals with the cases it fails to parse. This function will be called
- * when exceptions are caught during converting. This functions also deals with `mode` option.
- */
- private def failedRecord(record: String): Seq[InternalRow] = {
- // create a row even if no corrupt record column is present
- if (options.failFast) {
- throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
- }
- if (options.dropMalformed) {
- if (!isWarningPrintedForMalformedRecord) {
- logWarning(
- s"""Found at least one malformed records (sample: $record). The JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
- |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===================================================
- |// The corrupted record exists in column ${columnNameOfCorruptRecord}
- |val parsedJson = spark.read.json("/path/to/json/file/test.json")
- |
- """.stripMargin)
- isWarningPrintedForMalformedRecord = true
- }
- Nil
- } else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
- if (!isWarningPrintedForMalformedRecord) {
- logWarning(
- s"""Found at least one malformed records (sample: $record). The JSON reader will replace
- |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
- |To find out which corrupted records have been replaced with null, please use the
- |default inferred schema instead of providing a custom schema.
- |
- |Code example to print all malformed records (scala):
- |===================================================
- |// The corrupted record exists in column ${columnNameOfCorruptRecord}.
- |val parsedJson = spark.read.json("/path/to/json/file/test.json")
- |
- """.stripMargin)
- isWarningPrintedForMalformedRecord = true
- }
- emptyRow
- } else {
- val row = new GenericMutableRow(schema.length)
- for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecord)) {
- require(schema(corruptIndex).dataType == StringType)
- row.update(corruptIndex, UTF8String.fromString(record))
- }
- Seq(row)
- }
- }
-
- /**
- * Create a converter which converts the JSON documents held by the `JsonParser`
- * to a value according to a desired schema. This is a wrapper for the method
- * `makeConverter()` to handle a row wrapped with an array.
- */
- def makeRootConverter(dataType: DataType): ValueConverter = dataType match {
- case st: StructType =>
- val elementConverter = makeConverter(st)
- val fieldConverters = st.map(_.dataType).map(makeConverter)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case START_OBJECT => convertObject(parser, st, fieldConverters)
- // SPARK-3308: support reading top level JSON arrays and take every element
- // in such an array as a row
- //
- // For example, we support, the JSON data as below:
- //
- // [{"a":"str_a_1"}]
- // [{"a":"str_a_2"}, {"b":"str_b_3"}]
- //
- // resulting in:
- //
- // List([str_a_1,null])
- // List([str_a_2,null], [null,str_b_3])
- //
- case START_ARRAY => convertArray(parser, elementConverter)
- }
-
- case ArrayType(st: StructType, _) =>
- val elementConverter = makeConverter(st)
- val fieldConverters = st.map(_.dataType).map(makeConverter)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- // the business end of SPARK-3308:
- // when an object is found but an array is requested just wrap it in a list.
- // This is being wrapped in `JacksonParser.parse`.
- case START_OBJECT => convertObject(parser, st, fieldConverters)
- case START_ARRAY => convertArray(parser, elementConverter)
- }
-
- case _ => makeConverter(dataType)
- }
-
- /**
- * Create a converter which converts the JSON documents held by the `JsonParser`
- * to a value according to a desired schema.
- */
- private def makeConverter(dataType: DataType): ValueConverter = dataType match {
- case BooleanType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_TRUE => true
- case VALUE_FALSE => false
- }
-
- case ByteType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT => parser.getByteValue
- }
-
- case ShortType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT => parser.getShortValue
- }
-
- case IntegerType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT => parser.getIntValue
- }
-
- case LongType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT => parser.getLongValue
- }
-
- case FloatType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
- parser.getFloatValue
-
- case VALUE_STRING =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toFloat
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
- }
- }
-
- case DoubleType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
- parser.getDoubleValue
-
- case VALUE_STRING =>
- // Special case handling for NaN and Infinity.
- val value = parser.getText
- val lowerCaseValue = value.toLowerCase
- if (lowerCaseValue.equals("nan") ||
- lowerCaseValue.equals("infinity") ||
- lowerCaseValue.equals("-infinity") ||
- lowerCaseValue.equals("inf") ||
- lowerCaseValue.equals("-inf")) {
- value.toDouble
- } else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
- }
- }
-
- case StringType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_STRING =>
- UTF8String.fromString(parser.getText)
-
- case _ =>
- // Note that it always tries to convert the data as string without the case of failure.
- val writer = new ByteArrayOutputStream()
- Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
- generator => generator.copyCurrentStructure(parser)
- }
- UTF8String.fromBytes(writer.toByteArray)
- }
-
- case TimestampType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_STRING =>
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.
- Try(options.timestampFormat.parse(parser.getText).getTime * 1000L)
- .getOrElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
- }
-
- case VALUE_NUMBER_INT =>
- parser.getLongValue * 1000000L
- }
-
- case DateType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_STRING =>
- val stringValue = parser.getText
- // This one will lose microseconds parts.
- // See https://issues.apache.org/jira/browse/SPARK-10681.x
- Try(DateTimeUtils.millisToDays(options.dateFormat.parse(parser.getText).getTime))
- .getOrElse {
- // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
- // compatibility.
- Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime))
- .getOrElse {
- // In Spark 1.5.0, we store the data as number of days since epoch in string.
- // So, we just convert it to Int.
- stringValue.toInt
- }
- }
- }
-
- case BinaryType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case VALUE_STRING => parser.getBinaryValue
- }
-
- case dt: DecimalType =>
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
- Decimal(parser.getDecimalValue, dt.precision, dt.scale)
- }
-
- case st: StructType =>
- val fieldConverters = st.map(_.dataType).map(makeConverter)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case START_OBJECT => convertObject(parser, st, fieldConverters)
- }
-
- case at: ArrayType =>
- val elementConverter = makeConverter(at.elementType)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case START_ARRAY => convertArray(parser, elementConverter)
- }
-
- case mt: MapType =>
- val valueConverter = makeConverter(mt.valueType)
- (parser: JsonParser) => parseJsonToken(parser, dataType) {
- case START_OBJECT => convertMap(parser, valueConverter)
- }
-
- case udt: UserDefinedType[_] =>
- makeConverter(udt.sqlType)
-
- case _ =>
- (parser: JsonParser) =>
- // Here, we pass empty `PartialFunction` so that this case can be
- // handled as a failed conversion. It will throw an exception as
- // long as the value is not null.
- parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any])
- }
-
- /**
- * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying
- * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the
- * token, call `failedConversion` to handle the token.
- */
- private def parseJsonToken(
- parser: JsonParser,
- dataType: DataType)(f: PartialFunction[JsonToken, Any]): Any = {
- parser.getCurrentToken match {
- case FIELD_NAME =>
- // There are useless FIELD_NAMEs between START_OBJECT and END_OBJECT tokens
- parser.nextToken()
- parseJsonToken(parser, dataType)(f)
-
- case null | VALUE_NULL => null
-
- case other => f.applyOrElse(other, failedConversion(parser, dataType))
- }
- }
-
- /**
- * This function throws an exception for failed conversion, but returns null for empty string,
- * to guard the non string types.
- */
- private def failedConversion(
- parser: JsonParser,
- dataType: DataType): PartialFunction[JsonToken, Any] = {
- case VALUE_STRING if parser.getTextLength < 1 =>
- // If conversion is failed, this produces `null` rather than throwing exception.
- // This will protect the mismatch of types.
- null
-
- case token =>
- // We cannot parse this token based on the given data type. So, we throw a
- // SparkSQLJsonProcessingException and this exception will be caught by
- // `parse` method.
- throw new SparkSQLJsonProcessingException(
- s"Failed to parse a value for data type $dataType (current token: $token).")
- }
-
- /**
- * Parse an object from the token stream into a new Row representing the schema.
- * Fields in the json that are not defined in the requested schema will be dropped.
- */
- private def convertObject(
- parser: JsonParser,
- schema: StructType,
- fieldConverters: Seq[ValueConverter]): InternalRow = {
- val row = new GenericMutableRow(schema.length)
- while (nextUntil(parser, JsonToken.END_OBJECT)) {
- schema.getFieldIndex(parser.getCurrentName) match {
- case Some(index) =>
- row.update(index, fieldConverters(index).apply(parser))
-
- case None =>
- parser.skipChildren()
- }
- }
-
- row
- }
-
- /**
- * Parse an object as a Map, preserving all fields.
- */
- private def convertMap(
- parser: JsonParser,
- fieldConverter: ValueConverter): MapData = {
- val keys = ArrayBuffer.empty[UTF8String]
- val values = ArrayBuffer.empty[Any]
- while (nextUntil(parser, JsonToken.END_OBJECT)) {
- keys += UTF8String.fromString(parser.getCurrentName)
- values += fieldConverter.apply(parser)
- }
-
- ArrayBasedMapData(keys.toArray, values.toArray)
- }
-
- /**
- * Parse an object as a Array.
- */
- private def convertArray(
- parser: JsonParser,
- fieldConverter: ValueConverter): ArrayData = {
- val values = ArrayBuffer.empty[Any]
- while (nextUntil(parser, JsonToken.END_ARRAY)) {
- values += fieldConverter.apply(parser)
- }
-
- new GenericArrayData(values.toArray)
- }
-
- /**
- * Parse the string JSON input to the set of [[InternalRow]]s.
- */
- def parse(input: String): Seq[InternalRow] = {
- if (input.trim.isEmpty) {
- Nil
- } else {
- try {
- Utils.tryWithResource(factory.createParser(input)) { parser =>
- parser.nextToken()
- rootConverter.apply(parser) match {
- case null => failedRecord(input)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- // Here, as we support reading top level JSON arrays and take every element
- // in such an array as a row, this case is possible.
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- failedRecord(input)
- }
- }
- } catch {
- case _: JsonProcessingException =>
- failedRecord(input)
- case _: SparkSQLJsonProcessingException =>
- failedRecord(input)
- }
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
deleted file mode 100644
index 005546f37d..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonUtils.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.json
-
-import com.fasterxml.jackson.core.{JsonParser, JsonToken}
-
-private object JacksonUtils {
- /**
- * Advance the parser until a null or a specific token is found
- */
- def nextUntil(parser: JsonParser, stopOn: JsonToken): Boolean = {
- parser.nextToken() match {
- case null => false
- case x => x != stopOn
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 6882a6cdca..9fe38ccc9f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -32,6 +32,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index a875b01ec2..9f96667311 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 47bf41a2da..3bc1c5b900 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
@@ -2819,6 +2820,63 @@ object functions {
}
/**
+ * (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ * @param e a string column containing JSON data.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
+ JsonToStruct(schema, options, e.expr)
+ }
+
+ /**
+ * (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
+ * specified schema. Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ * @param options options to control how the json is parsed. accepts the same options and the
+ * json data source.
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
+ from_json(e, schema, options.asScala.toMap)
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: StructType): Column =
+ from_json(e, schema, Map.empty[String, String])
+
+ /**
+ * Parses a column containing a JSON string into a [[StructType]] with the specified schema.
+ * Returns `null`, in the case of an unparseable string.
+ *
+ * @param e a string column containing JSON data.
+ * @param schema the schema to use when parsing the json string as a json string
+ *
+ * @group collection_funcs
+ * @since 2.1.0
+ */
+ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
+ from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)
+
+ /**
* Returns length of array or map.
*
* @group collection_funcs
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 1391c9d57f..518d6e92b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -94,4 +96,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}
+
+ test("json_parser") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(1)) :: Nil)
+ }
+
+ test("json_parser missing columns") {
+ val df = Seq("""{"a": 1}""").toDS()
+ val schema = new StructType().add("b", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(Row(null)) :: Nil)
+ }
+
+ test("json_parser invalid json") {
+ val df = Seq("""{"a" 1}""").toDS()
+ val schema = new StructType().add("a", IntegerType)
+
+ checkAnswer(
+ df.select(from_json($"value", schema)),
+ Row(null) :: Nil)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
index c31dffedbd..0b72da5f37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.test.SharedSQLContext
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3d533c14e1..456052f79a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -26,9 +26,10 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkException
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType