aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-16 00:06:14 -0800
committerReynold Xin <rxin@databricks.com>2015-11-16 00:06:14 -0800
commit42de5253f327bd7ee258b0efb5024f3847fa3b51 (patch)
tree67437c76160ebee36d3b378599b1d519e6c72f8a
parentfd50fa4c3eff42e8adeeabe399ddba0edac930c8 (diff)
downloadspark-42de5253f327bd7ee258b0efb5024f3847fa3b51.tar.gz
spark-42de5253f327bd7ee258b0efb5024f3847fa3b51.tar.bz2
spark-42de5253f327bd7ee258b0efb5024f3847fa3b51.zip
[SPARK-11745][SQL] Enable more JSON parsing options
This patch adds the following options to the JSON data source, for dealing with non-standard JSON files: * `allowComments` (default `false`): ignores Java/C++ style comment in JSON records * `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names * `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes * `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012) To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options. Also updated documentation to explain these options. Scala ![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png) Python ![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png) Author: Reynold Xin <rxin@databricks.com> Closes #9724 from rxin/SPARK-11745.
-rw-r--r--python/pyspark/sql/readwriter.py10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala64
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala82
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala114
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala29
9 files changed, 286 insertions, 106 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 927f407742..7b8ddb9feb 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -153,6 +153,16 @@ class DataFrameReader(object):
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
+ You can set the following JSON-specific options to deal with non-standard JSON files:
+ * ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
+ type
+ * ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
+ * ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
+ * ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
+ quotes
+ * ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
+ (e.g. 00012)
+
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 6a194a443a..5872fbded3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.json.{JSONOptions, JSONRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
@@ -227,6 +227,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* This function goes through the input once to determine the input schema. If you know the
* schema in advance, use the version that specifies the schema to avoid the extra scan.
*
+ * You can set the following JSON-specific options to deal with non-standard JSON files:
+ * <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
+ * <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
+ * <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
+ * <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
+ * </li>
+ * <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
+ * (e.g. 00012)</li>
+ *
* @param path input path
* @since 1.4.0
*/
@@ -255,16 +264,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def json(jsonRDD: RDD[String]): DataFrame = {
- val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
- val primitivesAsString = extraOptions.getOrElse("primitivesAsString", "false").toBoolean
sqlContext.baseRelationToDataFrame(
new JSONRelation(
Some(jsonRDD),
- samplingRatio,
- primitivesAsString,
- userSpecifiedSchema,
- None,
- None)(sqlContext)
+ maybeDataSchema = userSpecifiedSchema,
+ maybePartitionSpec = None,
+ userDefinedPartitionColumns = None,
+ parameters = extraOptions.toMap)(sqlContext)
)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1b833002f4..534a3bcb83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -221,22 +221,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
- protected def newProjection(
- expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
- log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
- try {
- GenerateProjection.generate(expressions, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate projection, fallback to interpret", e)
- new InterpretedProjection(expressions, inputSchema)
- }
- }
- }
-
protected def newMutableProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
@@ -282,6 +266,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
}
}
+
/**
* Creates a row ordering for the given schema, in natural ascending order.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index b9914c581a..922fd5b211 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -25,33 +25,36 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-private[sql] object InferSchema {
+
+private[json] object InferSchema {
+
/**
* Infer the type of a collection of json records in three stages:
* 1. Infer the type of each record
* 2. Merge types by choosing the lowest type necessary to cover equal keys
* 3. Replace any remaining null fields with string, the top type
*/
- def apply(
+ def infer(
json: RDD[String],
- samplingRatio: Double = 1.0,
columnNameOfCorruptRecords: String,
- primitivesAsString: Boolean = false): StructType = {
- require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
- val schemaData = if (samplingRatio > 0.99) {
+ configOptions: JSONOptions): StructType = {
+ require(configOptions.samplingRatio > 0,
+ s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
+ val schemaData = if (configOptions.samplingRatio > 0.99) {
json
} else {
- json.sample(withReplacement = false, samplingRatio, 1)
+ json.sample(withReplacement = false, configOptions.samplingRatio, 1)
}
// perform schema inference on each row and merge afterwards
val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
+ configOptions.setJacksonOptions(factory)
iter.map { row =>
try {
Utils.tryWithResource(factory.createParser(row)) { parser =>
parser.nextToken()
- inferField(parser, primitivesAsString)
+ inferField(parser, configOptions)
}
} catch {
case _: JsonParseException =>
@@ -71,14 +74,14 @@ private[sql] object InferSchema {
/**
* Infer the type of a json document from the parser's token stream
*/
- private def inferField(parser: JsonParser, primitivesAsString: Boolean): DataType = {
+ private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
import com.fasterxml.jackson.core.JsonToken._
parser.getCurrentToken match {
case null | VALUE_NULL => NullType
case FIELD_NAME =>
parser.nextToken()
- inferField(parser, primitivesAsString)
+ inferField(parser, configOptions)
case VALUE_STRING if parser.getTextLength < 1 =>
// Zero length strings and nulls have special handling to deal
@@ -95,7 +98,7 @@ private[sql] object InferSchema {
while (nextUntil(parser, END_OBJECT)) {
builder += StructField(
parser.getCurrentName,
- inferField(parser, primitivesAsString),
+ inferField(parser, configOptions),
nullable = true)
}
@@ -107,14 +110,15 @@ private[sql] object InferSchema {
// the type as we pass through all JSON objects.
var elementType: DataType = NullType
while (nextUntil(parser, END_ARRAY)) {
- elementType = compatibleType(elementType, inferField(parser, primitivesAsString))
+ elementType = compatibleType(
+ elementType, inferField(parser, configOptions))
}
ArrayType(elementType)
- case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if primitivesAsString => StringType
+ case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if configOptions.primitivesAsString => StringType
- case (VALUE_TRUE | VALUE_FALSE) if primitivesAsString => StringType
+ case (VALUE_TRUE | VALUE_FALSE) if configOptions.primitivesAsString => StringType
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
import JsonParser.NumberType._
@@ -178,7 +182,7 @@ private[sql] object InferSchema {
/**
* Returns the most general data type for two given data types.
*/
- private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
+ def compatibleType(t1: DataType, t2: DataType): DataType = {
HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2).getOrElse {
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
(t1, t2) match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
new file mode 100644
index 0000000000..c132ead20e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import com.fasterxml.jackson.core.{JsonParser, JsonFactory}
+
+/**
+ * Options for the JSON data source.
+ *
+ * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
+ */
+case class JSONOptions(
+ samplingRatio: Double = 1.0,
+ primitivesAsString: Boolean = false,
+ allowComments: Boolean = false,
+ allowUnquotedFieldNames: Boolean = false,
+ allowSingleQuotes: Boolean = true,
+ allowNumericLeadingZeros: Boolean = false,
+ allowNonNumericNumbers: Boolean = false) {
+
+ /** Sets config options on a Jackson [[JsonFactory]]. */
+ def setJacksonOptions(factory: JsonFactory): Unit = {
+ factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
+ factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
+ factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
+ factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
+ factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
+ }
+}
+
+
+object JSONOptions {
+ def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
+ samplingRatio =
+ parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0),
+ primitivesAsString =
+ parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false),
+ allowComments =
+ parameters.get("allowComments").map(_.toBoolean).getOrElse(false),
+ allowUnquotedFieldNames =
+ parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false),
+ allowSingleQuotes =
+ parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true),
+ allowNumericLeadingZeros =
+ parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false),
+ allowNonNumericNumbers =
+ parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
+ )
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index dca638b7f6..3e61ba35be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -52,13 +52,9 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
- val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
- val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
new JSONRelation(
inputRDD = None,
- samplingRatio = samplingRatio,
- primitivesAsString = primitivesAsString,
maybeDataSchema = dataSchema,
maybePartitionSpec = None,
userDefinedPartitionColumns = partitionColumns,
@@ -69,8 +65,6 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class JSONRelation(
val inputRDD: Option[RDD[String]],
- val samplingRatio: Double,
- val primitivesAsString: Boolean,
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
@@ -79,6 +73,8 @@ private[sql] class JSONRelation(
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters) {
+ val options: JSONOptions = JSONOptions.createFromConfigMap(parameters)
+
/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
@@ -109,17 +105,16 @@ private[sql] class JSONRelation(
classOf[Text]).map(_._2.toString) // get the text line
}
- override lazy val dataSchema = {
+ override lazy val dataSchema: StructType = {
val jsonSchema = maybeDataSchema.getOrElse {
val files = cachedLeafStatuses().filterNot { status =>
val name = status.getPath.getName
name.startsWith("_") || name.startsWith(".")
}.toArray
- InferSchema(
+ InferSchema.infer(
inputRDD.getOrElse(createBaseRdd(files)),
- samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord,
- primitivesAsString)
+ options)
}
checkConstraints(jsonSchema)
@@ -132,10 +127,11 @@ private[sql] class JSONRelation(
inputPaths: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
- val rows = JacksonParser(
+ val rows = JacksonParser.parse(
inputRDD.getOrElse(createBaseRdd(inputPaths)),
requiredDataSchema,
- sqlContext.conf.columnNameOfCorruptRecord)
+ sqlContext.conf.columnNameOfCorruptRecord,
+ options)
rows.mapPartitions { iterator =>
val unsafeProjection = UnsafeProjection.create(requiredDataSchema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index 4f53eeb081..bfa1405041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -18,11 +18,10 @@
package org.apache.spark.sql.execution.datasources.json
import java.io.ByteArrayOutputStream
+import scala.collection.mutable.ArrayBuffer
import com.fasterxml.jackson.core._
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -32,18 +31,23 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
-private[sql] object JacksonParser {
- def apply(
- json: RDD[String],
+object JacksonParser {
+
+ def parse(
+ input: RDD[String],
schema: StructType,
- columnNameOfCorruptRecords: String): RDD[InternalRow] = {
- parseJson(json, schema, columnNameOfCorruptRecords)
+ columnNameOfCorruptRecords: String,
+ configOptions: JSONOptions): RDD[InternalRow] = {
+
+ input.mapPartitions { iter =>
+ parseJson(iter, schema, columnNameOfCorruptRecords, configOptions)
+ }
}
/**
* Parse the current token (and related children) according to a desired schema
*/
- private[sql] def convertField(
+ def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
@@ -226,9 +230,10 @@ private[sql] object JacksonParser {
}
private def parseJson(
- json: RDD[String],
+ input: Iterator[String],
schema: StructType,
- columnNameOfCorruptRecords: String): RDD[InternalRow] = {
+ columnNameOfCorruptRecords: String,
+ configOptions: JSONOptions): Iterator[InternalRow] = {
def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
@@ -241,37 +246,36 @@ private[sql] object JacksonParser {
Seq(row)
}
- json.mapPartitions { iter =>
- val factory = new JsonFactory()
-
- iter.flatMap { record =>
- if (record.trim.isEmpty) {
- Nil
- } else {
- try {
- Utils.tryWithResource(factory.createParser(record)) { parser =>
- parser.nextToken()
-
- convertField(factory, parser, schema) match {
- case null => failedRecord(record)
- case row: InternalRow => row :: Nil
- case array: ArrayData =>
- if (array.numElements() == 0) {
- Nil
- } else {
- array.toArray[InternalRow](schema)
- }
- case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of " +
- "the file (or each string in the RDD) is a valid JSON object or " +
- "an array of JSON objects.")
- }
+ val factory = new JsonFactory()
+ configOptions.setJacksonOptions(factory)
+
+ input.flatMap { record =>
+ if (record.trim.isEmpty) {
+ Nil
+ } else {
+ try {
+ Utils.tryWithResource(factory.createParser(record)) { parser =>
+ parser.nextToken()
+
+ convertField(factory, parser, schema) match {
+ case null => failedRecord(record)
+ case row: InternalRow => row :: Nil
+ case array: ArrayData =>
+ if (array.numElements() == 0) {
+ Nil
+ } else {
+ array.toArray[InternalRow](schema)
+ }
+ case _ =>
+ sys.error(
+ s"Failed to parse record $record. Please make sure that each line of " +
+ "the file (or each string in the RDD) is a valid JSON object or " +
+ "an array of JSON objects.")
}
- } catch {
- case _: JsonProcessingException =>
- failedRecord(record)
}
+ } catch {
+ case _: JsonProcessingException =>
+ failedRecord(record)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
new file mode 100644
index 0000000000..4cc0a3a958
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test cases for various [[JSONOptions]].
+ */
+class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
+
+ test("allowComments off") {
+ val str = """{'name': /* hello */ 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.json(rdd)
+
+ assert(df.schema.head.name == "_corrupt_record")
+ }
+
+ test("allowComments on") {
+ val str = """{'name': /* hello */ 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.option("allowComments", "true").json(rdd)
+
+ assert(df.schema.head.name == "name")
+ assert(df.first().getString(0) == "Reynold Xin")
+ }
+
+ test("allowSingleQuotes off") {
+ val str = """{'name': 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.option("allowSingleQuotes", "false").json(rdd)
+
+ assert(df.schema.head.name == "_corrupt_record")
+ }
+
+ test("allowSingleQuotes on") {
+ val str = """{'name': 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.json(rdd)
+
+ assert(df.schema.head.name == "name")
+ assert(df.first().getString(0) == "Reynold Xin")
+ }
+
+ test("allowUnquotedFieldNames off") {
+ val str = """{name: 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.json(rdd)
+
+ assert(df.schema.head.name == "_corrupt_record")
+ }
+
+ test("allowUnquotedFieldNames on") {
+ val str = """{name: 'Reynold Xin'}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.option("allowUnquotedFieldNames", "true").json(rdd)
+
+ assert(df.schema.head.name == "name")
+ assert(df.first().getString(0) == "Reynold Xin")
+ }
+
+ test("allowNumericLeadingZeros off") {
+ val str = """{"age": 0018}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.json(rdd)
+
+ assert(df.schema.head.name == "_corrupt_record")
+ }
+
+ test("allowNumericLeadingZeros on") {
+ val str = """{"age": 0018}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.option("allowNumericLeadingZeros", "true").json(rdd)
+
+ assert(df.schema.head.name == "age")
+ assert(df.first().getLong(0) == 18)
+ }
+
+ // The following two tests are not really working - need to look into Jackson's
+ // JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
+ ignore("allowNonNumericNumbers off") {
+ val str = """{"age": NaN}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.json(rdd)
+
+ assert(df.schema.head.name == "_corrupt_record")
+ }
+
+ ignore("allowNonNumericNumbers on") {
+ val str = """{"age": NaN}"""
+ val rdd = sqlContext.sparkContext.parallelize(Seq(str))
+ val df = sqlContext.read.option("allowNonNumericNumbers", "true").json(rdd)
+
+ assert(df.schema.head.name == "age")
+ assert(df.first().getDouble(0).isNaN)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 28b8f02bdf..6042b1178a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -588,7 +588,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
relation.isInstanceOf[JSONRelation],
"The DataFrame returned by jsonFile should be based on JSONRelation.")
assert(relation.asInstanceOf[JSONRelation].paths === Array(path))
- assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001))
+ assert(relation.asInstanceOf[JSONRelation].options.samplingRatio === (0.49 +- 0.001))
val schema = StructType(StructField("a", LongType, true) :: Nil)
val logicalRelation =
@@ -597,7 +597,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
assert(relationWithSchema.paths === Array(path))
assert(relationWithSchema.schema === schema)
- assert(relationWithSchema.samplingRatio > 0.99)
+ assert(relationWithSchema.options.samplingRatio > 0.99)
}
test("Loading a JSON dataset from a text file") {
@@ -1165,31 +1165,28 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("JSONRelation equality test") {
val relation0 = new JSONRelation(
Some(empty),
- 1.0,
- false,
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
- None, None)(sqlContext)
+ None,
+ None)(sqlContext)
val logicalRelation0 = LogicalRelation(relation0)
val relation1 = new JSONRelation(
Some(singleRow),
- 1.0,
- false,
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
- None, None)(sqlContext)
+ None,
+ None)(sqlContext)
val logicalRelation1 = LogicalRelation(relation1)
val relation2 = new JSONRelation(
Some(singleRow),
- 0.5,
- false,
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
- None, None)(sqlContext)
+ None,
+ None,
+ parameters = Map("samplingRatio" -> "0.5"))(sqlContext)
val logicalRelation2 = LogicalRelation(relation2)
val relation3 = new JSONRelation(
Some(singleRow),
- 1.0,
- false,
Some(StructType(StructField("b", IntegerType, true) :: Nil)),
- None, None)(sqlContext)
+ None,
+ None)(sqlContext)
val logicalRelation3 = LogicalRelation(relation3)
assert(relation0 !== relation1)
@@ -1232,7 +1229,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
- val emptySchema = InferSchema(empty, 1.0, "")
+ val emptySchema = InferSchema.infer(empty, "", JSONOptions())
assert(StructType(Seq()) === emptySchema)
}
@@ -1256,7 +1253,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("SPARK-8093 Erase empty structs") {
- val emptySchema = InferSchema(emptyRecords, 1.0, "")
+ val emptySchema = InferSchema.infer(emptyRecords, "", JSONOptions())
assert(StructType(Seq()) === emptySchema)
}