aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-12-16 23:18:53 -0800
committerReynold Xin <rxin@databricks.com>2015-12-16 23:18:53 -0800
commit9d66c4216ad830812848c657bbcd8cd50949e199 (patch)
tree05e072cb1e4b77d67ec33fec6d30cf4b2a23e361 /sql
parent437583f692e30b8dc03b339a34e92595d7b992ba (diff)
downloadspark-9d66c4216ad830812848c657bbcd8cd50949e199.tar.gz
spark-9d66c4216ad830812848c657bbcd8cd50949e199.tar.bz2
spark-9d66c4216ad830812848c657bbcd8cd50949e199.zip
[SPARK-12057][SQL] Prevent failure on corrupt JSON records
This PR makes JSON parser and schema inference handle more cases where we have unparsed records. It is based on #10043. The last commit fixes the failed test and updates the logic of schema inference. Regarding the schema inference change, if we have something like ``` {"f1":1} [1,2,3] ``` originally, we will get a DF without any column. After this change, we will get a DF with columns `f1` and `_corrupt_record`. Basically, for the second row, `[1,2,3]` will be the value of `_corrupt_record`. When merge this PR, please make sure that the author is simplyianm. JIRA: https://issues.apache.org/jira/browse/SPARK-12057 Closes #10043 Author: Ian Macalinao <me@ian.pw> Author: Yin Huai <yhuai@databricks.com> Closes #10288 from yhuai/handleCorruptJson.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala19
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala37
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala9
4 files changed, 90 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
index 922fd5b211..59ba4ae2cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
@@ -61,7 +61,10 @@ private[json] object InferSchema {
StructType(Seq(StructField(columnNameOfCorruptRecords, StringType)))
}
}
- }.treeAggregate[DataType](StructType(Seq()))(compatibleRootType, compatibleRootType)
+ }.treeAggregate[DataType](
+ StructType(Seq()))(
+ compatibleRootType(columnNameOfCorruptRecords),
+ compatibleRootType(columnNameOfCorruptRecords))
canonicalizeType(rootType) match {
case Some(st: StructType) => st
@@ -170,12 +173,38 @@ private[json] object InferSchema {
case other => Some(other)
}
+ private def withCorruptField(
+ struct: StructType,
+ columnNameOfCorruptRecords: String): StructType = {
+ if (!struct.fieldNames.contains(columnNameOfCorruptRecords)) {
+ // If this given struct does not have a column used for corrupt records,
+ // add this field.
+ struct.add(columnNameOfCorruptRecords, StringType, nullable = true)
+ } else {
+ // Otherwise, just return this struct.
+ struct
+ }
+ }
+
/**
* Remove top-level ArrayType wrappers and merge the remaining schemas
*/
- private def compatibleRootType: (DataType, DataType) => DataType = {
- case (ArrayType(ty1, _), ty2) => compatibleRootType(ty1, ty2)
- case (ty1, ArrayType(ty2, _)) => compatibleRootType(ty1, ty2)
+ private def compatibleRootType(
+ columnNameOfCorruptRecords: String): (DataType, DataType) => DataType = {
+ // Since we support array of json objects at the top level,
+ // we need to check the element type and find the root level data type.
+ case (ArrayType(ty1, _), ty2) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+ case (ty1, ArrayType(ty2, _)) => compatibleRootType(columnNameOfCorruptRecords)(ty1, ty2)
+ // If we see any other data type at the root level, we get records that cannot be
+ // parsed. So, we use the struct as the data type and add the corrupt field to the schema.
+ case (struct: StructType, NullType) => struct
+ case (NullType, struct: StructType) => struct
+ case (struct: StructType, o) if !o.isInstanceOf[StructType] =>
+ withCorruptField(struct, columnNameOfCorruptRecords)
+ case (o, struct: StructType) if !o.isInstanceOf[StructType] =>
+ withCorruptField(struct, columnNameOfCorruptRecords)
+ // If we get anything else, we call compatibleType.
+ // Usually, when we reach here, ty1 and ty2 are two StructTypes.
case (ty1, ty2) => compatibleType(ty1, ty2)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index bfa1405041..55a1c24e9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
+private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
+
object JacksonParser {
def parse(
@@ -110,7 +112,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) {
value.toFloat
} else {
- sys.error(s"Cannot parse $value as FloatType.")
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as FloatType.")
}
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, DoubleType) =>
@@ -127,7 +129,7 @@ object JacksonParser {
lowerCaseValue.equals("-inf")) {
value.toDouble
} else {
- sys.error(s"Cannot parse $value as DoubleType.")
+ throw new SparkSQLJsonProcessingException(s"Cannot parse $value as DoubleType.")
}
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, dt: DecimalType) =>
@@ -174,7 +176,11 @@ object JacksonParser {
convertField(factory, parser, udt.sqlType)
case (token, dataType) =>
- sys.error(s"Failed to parse a value for data type $dataType (current token: $token).")
+ // We cannot parse this token based on the given data type. So, we throw a
+ // SparkSQLJsonProcessingException and this exception will be caught by
+ // parseJson method.
+ throw new SparkSQLJsonProcessingException(
+ s"Failed to parse a value for data type $dataType (current token: $token).")
}
}
@@ -267,15 +273,14 @@ object JacksonParser {
array.toArray[InternalRow](schema)
}
case _ =>
- sys.error(
- s"Failed to parse record $record. Please make sure that each line of " +
- "the file (or each string in the RDD) is a valid JSON object or " +
- "an array of JSON objects.")
+ failedRecord(record)
}
}
} catch {
case _: JsonProcessingException =>
failedRecord(record)
+ case _: SparkSQLJsonProcessingException =>
+ failedRecord(record)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index ba7718c864..baa258ad26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1427,4 +1427,41 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
}
}
+
+ test("SPARK-12057 additional corrupt records do not throw exceptions") {
+ // Test if we can query corrupt records.
+ withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+ withTempTable("jsonTable") {
+ val schema = StructType(
+ StructField("_unparsed", StringType, true) ::
+ StructField("dummy", StringType, true) :: Nil)
+
+ {
+ // We need to make sure we can infer the schema.
+ val jsonDF = sqlContext.read.json(additionalCorruptRecords)
+ assert(jsonDF.schema === schema)
+ }
+
+ {
+ val jsonDF = sqlContext.read.schema(schema).json(additionalCorruptRecords)
+ jsonDF.registerTempTable("jsonTable")
+
+ // In HiveContext, backticks should be used to access columns starting with a underscore.
+ checkAnswer(
+ sql(
+ """
+ |SELECT dummy, _unparsed
+ |FROM jsonTable
+ """.stripMargin),
+ Row("test", null) ::
+ Row(null, """[1,2,3]""") ::
+ Row(null, """":"test", "a":1}""") ::
+ Row(null, """42""") ::
+ Row(null, """ ","ian":"test"}""") :: Nil
+ )
+ }
+ }
+ }
+ }
+
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index 713d1da1cb..cb61f7eeca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -188,6 +188,14 @@ private[json] trait TestJsonData {
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)
+ def additionalCorruptRecords: RDD[String] =
+ sqlContext.sparkContext.parallelize(
+ """{"dummy":"test"}""" ::
+ """[1,2,3]""" ::
+ """":"test", "a":1}""" ::
+ """42""" ::
+ """ ","ian":"test"}""" :: Nil)
+
def emptyRecords: RDD[String] =
sqlContext.sparkContext.parallelize(
"""{""" ::
@@ -197,7 +205,6 @@ private[json] trait TestJsonData {
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)
-
lazy val singleRow: RDD[String] = sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())