From 4e09a0d5ea50d1cfc936bc87cf3372b4a0aa7dc2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 22 Mar 2016 20:30:48 +0800 Subject: [SPARK-13953][SQL] Specifying the field name for corrupted record via option at JSON datasource ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-13953 Currently, JSON data source creates a new field in `PERMISSIVE` mode for storing malformed string. This field can be renamed via `spark.sql.columnNameOfCorruptRecord` option but it is a global configuration. This PR make that option can be applied per read and can be specified via `option()`. This will overwrites `spark.sql.columnNameOfCorruptRecord` if it is set. ## How was this patch tested? Unit tests were used and `./dev/run_tests` for coding style tests. Author: hyukjinkwon Closes #11881 from HyukjinKwon/SPARK-13953. --- python/pyspark/sql/readwriter.py | 5 ++++- .../org/apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++++++---- .../execution/datasources/json/JSONOptions.scala | 1 + .../execution/datasources/json/JSONRelation.scala | 10 ++++++++-- .../sql/execution/datasources/json/JsonSuite.scala | 21 +++++++++++++++++++++ 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bae9e69df8..cca57a385c 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -166,10 +166,13 @@ class DataFrameReader(object): during parsing. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ record and puts the malformed string into a new field configured by \ - ``spark.sql.columnNameOfCorruptRecord``. When a schema is set by user, it sets \ + ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ ``null`` for extra fields. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. + * ``columnNameOfCorruptRecord`` (default ``_corrupt_record``): allows renaming the \ + new field having malformed string created by ``PERMISSIVE`` mode. \ + This overrides ``spark.sql.columnNameOfCorruptRecord``. >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 0dc0d44d6c..1d4693f54f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -293,11 +293,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * during parsing.
  • *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the - * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When + * malformed string into a new field configured by `columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields.
    • *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    + *
  • `columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field + * having malformed string created by `PERMISSIVE` mode. This overrides + * `spark.sql.columnNameOfCorruptRecord`.
  • * * @since 1.4.0 */ @@ -326,11 +329,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * during parsing.
  • *
      *
    • `PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts the - * malformed string into a new field configured by `spark.sql.columnNameOfCorruptRecord`. When + * malformed string into a new field configured by `columnNameOfCorruptRecord`. When * a schema is set by user, it sets `null` for extra fields.
    • *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    + *
  • `columnNameOfCorruptRecord` (default `_corrupt_record`): allows renaming the new field + * having malformed string created by `PERMISSIVE` mode. This overrides + * `spark.sql.columnNameOfCorruptRecord`.
  • * * @since 1.6.0 */ @@ -360,8 +366,14 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { */ def json(jsonRDD: RDD[String]): DataFrame = { val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) val schema = userSpecifiedSchema.getOrElse { - InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions) + InferSchema.infer( + jsonRDD, + columnNameOfCorruptRecord, + parsedOptions) } Dataset.newDataFrame( @@ -371,7 +383,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { JacksonParser.parse( jsonRDD, schema, - sqlContext.conf.columnNameOfCorruptRecord, + columnNameOfCorruptRecord, parsedOptions))(sqlContext)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala index 93c3d47c1d..c0ad9efcb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONOptions.scala @@ -51,6 +51,7 @@ private[sql] class JSONOptions( parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false) val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName) private val parseMode = parameters.getOrElse("mode", "PERMISSIVE") + val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord") // Parse mode flags if (!ParseModes.isValidMode(parseMode)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 4f8de2587e..3bf0af0efa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -51,6 +51,9 @@ class DefaultSource extends FileFormat with DataSourceRegister { None } else { val parsedOptions: JSONOptions = new JSONOptions(options) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) val jsonFiles = files.filterNot { status => val name = status.getPath.getName name.startsWith("_") || name.startsWith(".") @@ -58,7 +61,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { val jsonSchema = InferSchema.infer( createBaseRdd(sqlContext, jsonFiles), - sqlContext.conf.columnNameOfCorruptRecord, + columnNameOfCorruptRecord, parsedOptions) checkConstraints(jsonSchema) @@ -102,10 +105,13 @@ class DefaultSource extends FileFormat with DataSourceRegister { val parsedOptions: JSONOptions = new JSONOptions(options) val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_))) + val columnNameOfCorruptRecord = + parsedOptions.columnNameOfCorruptRecord + .getOrElse(sqlContext.conf.columnNameOfCorruptRecord) val rows = JacksonParser.parse( createBaseRdd(sqlContext, jsonFiles), requiredDataSchema, - sqlContext.conf.columnNameOfCorruptRecord, + columnNameOfCorruptRecord, parsedOptions) rows.mapPartitions { iterator => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0a5699b99c..c108d81b18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1067,6 +1067,27 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("SPARK-13953 Rename the corrupt record field via option") { + val jsonDF = sqlContext.read + .option("columnNameOfCorruptRecord", "_malformed") + .json(corruptRecords) + val schema = StructType( + StructField("_malformed", StringType, true) :: + StructField("a", StringType, true) :: + StructField("b", StringType, true) :: + StructField("c", StringType, true) :: Nil) + + assert(schema === jsonDF.schema) + checkAnswer( + jsonDF.selectExpr("a", "b", "c", "_malformed"), + Row(null, null, null, "{") :: + Row(null, null, null, """{"a":1, b:2}""") :: + Row(null, null, null, """{"a":{, b:3}""") :: + Row("str_a_4", "str_b_4", "str_c_4", null) :: + Row(null, null, null, "]") :: Nil + ) + } + test("SPARK-4068: nulls in arrays") { val jsonDF = sqlContext.read.json(nullsInArrays) jsonDF.registerTempTable("jsonTable") -- cgit v1.2.3