aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorNathan Howell <nhowell@godaddy.com>2017-02-16 20:51:19 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-16 20:51:19 -0800
commit21fde57f15db974b710e7b00e72c744da7c1ac3c (patch)
treee51d0ab5ad405ff66c6459738186406a597a8f1c /sql/core/src/test
parentdcc2d540a53f0bd04baead43fdee1c170ef2b9f3 (diff)
downloadspark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.gz
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.tar.bz2
spark-21fde57f15db974b710e7b00e72c744da7c1ac3c.zip
[SPARK-18352][SQL] Support parsing multiline json files
## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. Author: Nathan Howell <nhowell@godaddy.com> Closes #16386 from NathanHowell/SPARK-18352.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala152
1 files changed, 147 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 9344aeda00..05aa2ab2ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -28,8 +28,8 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkException
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.{functions => F, _}
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
@@ -64,7 +64,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
- val parser = new JacksonParser(dummySchema, "", dummyOption)
+ val parser = new JacksonParser(dummySchema, dummyOption)
Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
@@ -1367,7 +1367,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
val emptySchema = JsonInferSchema.infer(
- empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
+ empty,
+ new JSONOptions(Map.empty[String, String], "GMT"),
+ CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
}
@@ -1392,7 +1394,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
- emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
+ emptyRecords,
+ new JSONOptions(Map.empty[String, String], "GMT"),
+ CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
}
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
assert(df2.schema == schema)
}
+
+ test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType
+ .toDF("value")
+ .write
+ .option("compression", "GzIp")
+ .text(path)
+
+ assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write
+ .option("compression", "gZiP")
+ .json(jsonDir)
+
+ assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+ val originalData = spark.read.json(primitiveFieldAndType)
+ checkAnswer(jsonDF, originalData)
+ checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
+ }
+ }
+
+ test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType
+ .toDF("value")
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write.json(jsonDir)
+
+ val compressedFiles = new File(jsonDir).listFiles()
+ assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+ val originalData = spark.read.json(primitiveFieldAndType)
+ checkAnswer(jsonDF, originalData)
+ checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
+ }
+ }
+
+ test("SPARK-18352: Expect one JSON document per file") {
+ // the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token.
+ // this might not be the optimal behavior but this test verifies that only the first value
+ // is parsed and the rest are discarded.
+
+ // alternatively the parser could continue parsing following objects, which may further reduce
+ // allocations by skipping the line reader entirely
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .createDataFrame(Seq(Tuple1("{}{invalid}")))
+ .coalesce(1)
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ // no corrupt record column should be created
+ assert(jsonDF.schema === StructType(Seq()))
+ // only the first object should be read
+ assert(jsonDF.count() === 1)
+ }
+ }
+
+ test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val corruptRecordCount = additionalCorruptRecords.count().toInt
+ assert(corruptRecordCount === 5)
+
+ additionalCorruptRecords
+ .toDF("value")
+ // this is the minimum partition count that avoids hash collisions
+ .repartition(corruptRecordCount * 4, F.hash($"value"))
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json(path)
+ assert(jsonDF.count() === corruptRecordCount)
+ assert(jsonDF.schema === new StructType()
+ .add("_corrupt_record", StringType)
+ .add("dummy", StringType))
+ val counts = jsonDF
+ .join(
+ additionalCorruptRecords.toDF("value"),
+ F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"),
+ "outer")
+ .agg(
+ F.count($"dummy").as("valid"),
+ F.count($"_corrupt_record").as("corrupt"),
+ F.count("*").as("count"))
+ checkAnswer(counts, Row(1, 4, 6))
+ }
+ }
+
+ test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val corruptRecordCount = additionalCorruptRecords.count().toInt
+ assert(corruptRecordCount === 5)
+
+ additionalCorruptRecords
+ .toDF("value")
+ // this is the minimum partition count that avoids hash collisions
+ .repartition(corruptRecordCount * 4, F.hash($"value"))
+ .write
+ .text(path)
+
+ val schema = new StructType().add("dummy", StringType)
+
+ // `FAILFAST` mode should throw an exception for corrupt records.
+ val exceptionOne = intercept[SparkException] {
+ spark.read
+ .option("wholeFile", true)
+ .option("mode", "FAILFAST")
+ .json(path)
+ .collect()
+ }
+ assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode"))
+
+ val exceptionTwo = intercept[SparkException] {
+ spark.read
+ .option("wholeFile", true)
+ .option("mode", "FAILFAST")
+ .schema(schema)
+ .json(path)
+ .collect()
+ }
+ assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode"))
+ }
+ }
}