aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-03-20 21:43:14 -0700
committerXiao Li <gatorsmile@gmail.com>2017-03-20 21:43:14 -0700
commit68d65fae71e475ad811a9716098aca03a2af9532 (patch)
tree8eb62ef41f500b43cdfe1325c35dc39498841020 /sql/catalyst
parent21e366aea5a7f49e42e78dce06ff6b3ee1e36f06 (diff)
downloadspark-68d65fae71e475ad811a9716098aca03a2af9532.tar.gz
spark-68d65fae71e475ad811a9716098aca03a2af9532.tar.bz2
spark-68d65fae71e475ad811a9716098aca03a2af9532.zip
[SPARK-19949][SQL] unify bad record handling in CSV and JSON
## What changes were proposed in this pull request? Currently JSON and CSV have exactly the same logic about handling bad records, this PR tries to abstract it and put it in a upper level to reduce code duplication. The overall idea is, we make the JSON and CSV parser to throw a BadRecordException, then the upper level, FailureSafeParser, handles bad records according to the parse mode. Behavior changes: 1. with PERMISSIVE mode, if the number of tokens doesn't match the schema, previously CSV parser will treat it as a legal record and parse as many tokens as possible. After this PR, we treat it as an illegal record, and put the raw record string in a special column, but we still parse as many tokens as possible. 2. all logging is removed as they are not very useful in practice. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Author: hyukjinkwon <gurwls223@gmail.com> Author: Wenchen Fan <cloud0fan@gmail.com> Closes #17315 from cloud-fan/bad-record2.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala122
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala80
4 files changed, 91 insertions, 117 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index e4e08a8665..08af5522d8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, ParseModes}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, GenericArrayData, ParseModes}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -583,7 +583,7 @@ case class JsonToStructs(
CreateJacksonParser.utf8String,
identity[UTF8String]))
} catch {
- case _: SparkSQLJsonProcessingException => null
+ case _: BadRecordException => null
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index 5f222ec602..355c26afa6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -65,7 +65,7 @@ private[sql] class JSONOptions(
val allowBackslashEscapingAnyCharacter =
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
- private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+ val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
val columnNameOfCorruptRecord =
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 9b80c0fc87..fdb7d88d5b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -32,17 +32,14 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
-private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
-
/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: StructType,
- options: JSONOptions) extends Logging {
+ val options: JSONOptions) extends Logging {
import JacksonUtils._
- import ParseModes._
import com.fasterxml.jackson.core.JsonToken._
// A `ValueConverter` is responsible for converting a value from `JsonParser`
@@ -55,108 +52,6 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)
- private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length))
-
- private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord)
- corruptFieldIndex.foreach { corrFieldIndex =>
- require(schema(corrFieldIndex).dataType == StringType)
- require(schema(corrFieldIndex).nullable)
- }
-
- @transient
- private[this] var isWarningPrinted: Boolean = false
-
- @transient
- private def printWarningForMalformedRecord(record: () => UTF8String): Unit = {
- def sampleRecord: String = {
- if (options.wholeFile) {
- ""
- } else {
- s"Sample record: ${record()}\n"
- }
- }
-
- def footer: String = {
- s"""Code example to print all malformed records (scala):
- |===================================================
- |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}.
- |val parsedJson = spark.read.json("/path/to/json/file/test.json")
- |
- """.stripMargin
- }
-
- if (options.permissive) {
- logWarning(
- s"""Found at least one malformed record. The JSON reader will replace
- |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode.
- |To find out which corrupted records have been replaced with null, please use the
- |default inferred schema instead of providing a custom schema.
- |
- |${sampleRecord ++ footer}
- |
- """.stripMargin)
- } else if (options.dropMalformed) {
- logWarning(
- s"""Found at least one malformed record. The JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which
- |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |${sampleRecord ++ footer}
- |
- """.stripMargin)
- }
- }
-
- @transient
- private def printWarningIfWholeFile(): Unit = {
- if (options.wholeFile && corruptFieldIndex.isDefined) {
- logWarning(
- s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result
- |in very large allocations or OutOfMemoryExceptions being raised.
- |
- """.stripMargin)
- }
- }
-
- /**
- * This function deals with the cases it fails to parse. This function will be called
- * when exceptions are caught during converting. This functions also deals with `mode` option.
- */
- private def failedRecord(record: () => UTF8String): Seq[InternalRow] = {
- corruptFieldIndex match {
- case _ if options.failFast =>
- if (options.wholeFile) {
- throw new SparkSQLJsonProcessingException("Malformed line in FAILFAST mode")
- } else {
- throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: ${record()}")
- }
-
- case _ if options.dropMalformed =>
- if (!isWarningPrinted) {
- printWarningForMalformedRecord(record)
- isWarningPrinted = true
- }
- Nil
-
- case None =>
- if (!isWarningPrinted) {
- printWarningForMalformedRecord(record)
- isWarningPrinted = true
- }
- emptyRow
-
- case Some(corruptIndex) =>
- if (!isWarningPrinted) {
- printWarningIfWholeFile()
- isWarningPrinted = true
- }
- val row = new GenericInternalRow(schema.length)
- row.update(corruptIndex, record())
- Seq(row)
- }
- }
-
/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
@@ -239,7 +134,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
+ throw new RuntimeException(s"Cannot parse $value as FloatType.")
}
}
@@ -259,7 +154,7 @@ class JacksonParser(
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
- throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
+ throw new RuntimeException(s"Cannot parse $value as DoubleType.")
}
}
@@ -391,9 +286,8 @@ class JacksonParser(
case token =>
// We cannot parse this token based on the given data type. So, we throw a
- // SparkSQLJsonProcessingException and this exception will be caught by
- // `parse` method.
- throw new SparkSQLJsonProcessingException(
+ // RuntimeException and this exception will be caught by `parse` method.
+ throw new RuntimeException(
s"Failed to parse a value for data type $dataType (current token: $token).")
}
@@ -466,14 +360,14 @@ class JacksonParser(
parser.nextToken() match {
case null => Nil
case _ => rootConverter.apply(parser) match {
- case null => throw new SparkSQLJsonProcessingException("Root converter returned null")
+ case null => throw new RuntimeException("Root converter returned null")
case rows => rows
}
}
}
} catch {
- case _: JsonProcessingException | _: SparkSQLJsonProcessingException =>
- failedRecord(() => recordLiteral(record))
+ case e @ (_: RuntimeException | _: JsonProcessingException) =>
+ throw BadRecordException(() => recordLiteral(record), () => None, e)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
new file mode 100644
index 0000000000..e8da10d65e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class FailureSafeParser[IN](
+ rawParser: IN => Seq[InternalRow],
+ mode: String,
+ schema: StructType,
+ columnNameOfCorruptRecord: String) {
+
+ private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
+ private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
+ private val resultRow = new GenericInternalRow(schema.length)
+ private val nullResult = new GenericInternalRow(schema.length)
+
+ // This function takes 2 parameters: an optional partial result, and the bad record. If the given
+ // schema doesn't contain a field for corrupted record, we just return the partial result or a
+ // row with all fields null. If the given schema contains a field for corrupted record, we will
+ // set the bad record to this field, and set other fields according to the partial result or null.
+ private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
+ if (corruptFieldIndex.isDefined) {
+ (row, badRecord) => {
+ var i = 0
+ while (i < actualSchema.length) {
+ val from = actualSchema(i)
+ resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
+ i += 1
+ }
+ resultRow(corruptFieldIndex.get) = badRecord()
+ resultRow
+ }
+ } else {
+ (row, _) => row.getOrElse(nullResult)
+ }
+ }
+
+ def parse(input: IN): Iterator[InternalRow] = {
+ try {
+ rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
+ } catch {
+ case e: BadRecordException if ParseModes.isPermissiveMode(mode) =>
+ Iterator(toResultRow(e.partialResult(), e.record))
+ case _: BadRecordException if ParseModes.isDropMalformedMode(mode) =>
+ Iterator.empty
+ case e: BadRecordException => throw e.cause
+ }
+ }
+}
+
+/**
+ * Exception thrown when the underlying parser meet a bad record and can't parse it.
+ * @param record a function to return the record that cause the parser to fail
+ * @param partialResult a function that returns an optional row, which is the partial result of
+ * parsing this bad record.
+ * @param cause the actual exception about why the record is bad and can't be parsed.
+ */
+case class BadRecordException(
+ record: () => UTF8String,
+ partialResult: () => Option[InternalRow],
+ cause: Throwable) extends Exception(cause)