aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-03-22 20:30:48 +0800
committerWenchen Fan <wenchen@databricks.com>2016-03-22 20:30:48 +0800
commit4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2 (patch)
treedeb8c64a0a23977ad4a3bfd66794e904c817a104
parentf2e855fba8eb73475cf312cdf880c1297d4323bb (diff)
downloadspark-4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2.tar.gz
spark-4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2.tar.bz2
spark-4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2.zip
[SPARK-13953][SQL] Specifying the field name for corrupted record via option at JSON datasource
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13953 Currently, JSON data source creates a new field in `PERMISSIVE` mode for storing malformed string. This field can be renamed via `spark.sql.columnNameOfCorruptRecord` option but it is a global configuration. This PR make that option can be applied per read and can be specified via `option()`. This will overwrites `spark.sql.columnNameOfCorruptRecord` if it is set. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for coding style tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #11881 from HyukjinKwon/SPARK-13953.
-rw-r--r--python/pyspark/sql/readwriter.py5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala21
5 files changed, 50 insertions, 7 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bae9e69df8..cca57a385c 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -166,10 +166,13 @@ class DataFrameReader(object):
during parsing.
* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record and puts the malformed string into a new field configured by \
- ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \
+ ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \
``null`` for extra fields.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.
+ * ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \
+ new field having malformed string created by ``PERMISSIVE`` mode. \
+ This overrides ``spark.sql.columnNameOfCorruptRecord``.
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 0dc0d44d6c..1d4693f54f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -293,11 +293,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* during parsing.<li>
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
- * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+ * malformed string into a new field configured by `columnNameOfCorruptRecord`. When
* a schema is set by user, it sets `null` for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
+ * <li>`columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field
+ * having malformed string created by `PERMISSIVE` mode. This overrides
+ * `spark.sql.columnNameOfCorruptRecord`.<li>
*
* @since 1.4.0
*/
@@ -326,11 +329,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* during parsing.<li>
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the
- * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When
+ * malformed string into a new field configured by `columnNameOfCorruptRecord`. When
* a schema is set by user, it sets `null` for extra fields.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
+ * <li>`columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field
+ * having malformed string created by `PERMISSIVE` mode. This overrides
+ * `spark.sql.columnNameOfCorruptRecord`.<li>
*
* @since 1.6.0
*/
@@ -360,8 +366,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*/
def json(jsonRDD: RDD[String]): DataFrame = {
val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+ val columnNameOfCorruptRecord =
+ parsedOptions.columnNameOfCorruptRecord
+ .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
val schema = userSpecifiedSchema.getOrElse {
- InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
+ InferSchema.infer(
+ jsonRDD,
+ columnNameOfCorruptRecord,
+ parsedOptions)
}
Dataset.newDataFrame(
@@ -371,7 +383,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
JacksonParser.parse(
jsonRDD,
schema,
- sqlContext.conf.columnNameOfCorruptRecord,
+ columnNameOfCorruptRecord,
parsedOptions))(sqlContext))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
index 93c3d47c1d..c0ad9efcb7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala
@@ -51,6 +51,7 @@ private[sql] class JSONOptions(
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
// Parse mode flags
if (!ParseModes.isValidMode(parseMode)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 4f8de2587e..3bf0af0efa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -51,6 +51,9 @@ class DefaultSource extends FileFormat with DataSourceRegister {
None
} else {
val parsedOptions: JSONOptions = new JSONOptions(options)
+ val columnNameOfCorruptRecord =
+ parsedOptions.columnNameOfCorruptRecord
+ .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
val jsonFiles = files.filterNot { status =>
val name = status.getPath.getName
name.startsWith("_") || name.startsWith(".")
@@ -58,7 +61,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val jsonSchema = InferSchema.infer(
createBaseRdd(sqlContext, jsonFiles),
- sqlContext.conf.columnNameOfCorruptRecord,
+ columnNameOfCorruptRecord,
parsedOptions)
checkConstraints(jsonSchema)
@@ -102,10 +105,13 @@ class DefaultSource extends FileFormat with DataSourceRegister {
val parsedOptions: JSONOptions = new JSONOptions(options)
val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
+ val columnNameOfCorruptRecord =
+ parsedOptions.columnNameOfCorruptRecord
+ .getOrElse(sqlContext.conf.columnNameOfCorruptRecord)
val rows = JacksonParser.parse(
createBaseRdd(sqlContext, jsonFiles),
requiredDataSchema,
- sqlContext.conf.columnNameOfCorruptRecord,
+ columnNameOfCorruptRecord,
parsedOptions)
rows.mapPartitions { iterator =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 0a5699b99c..c108d81b18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1067,6 +1067,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
+ test("SPARK-13953 Rename the corrupt record field via option") {
+ val jsonDF = sqlContext.read
+ .option("columnNameOfCorruptRecord", "_malformed")
+ .json(corruptRecords)
+ val schema = StructType(
+ StructField("_malformed", StringType, true) ::
+ StructField("a", StringType, true) ::
+ StructField("b", StringType, true) ::
+ StructField("c", StringType, true) :: Nil)
+
+ assert(schema === jsonDF.schema)
+ checkAnswer(
+ jsonDF.selectExpr("a", "b", "c", "_malformed"),
+ Row(null, null, null, "{") ::
+ Row(null, null, null, """{"a":1, b:2}""") ::
+ Row(null, null, null, """{"a":{, b:3}""") ::
+ Row("str_a_4", "str_b_4", "str_c_4", null) ::
+ Row(null, null, null, "]") :: Nil
+ )
+ }
+
test("SPARK-4068: nulls in arrays") {
val jsonDF = sqlContext.read.json(nullsInArrays)
jsonDF.registerTempTable("jsonTable")