aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala152
1 files changed, 147 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 9344aeda00..05aa2ab2ce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -28,8 +28,8 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkException
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.{functions => F, _}
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
@@ -64,7 +64,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
- val parser = new JacksonParser(dummySchema, "", dummyOption)
+ val parser = new JacksonParser(dummySchema, dummyOption)
Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
@@ -1367,7 +1367,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
val emptySchema = JsonInferSchema.infer(
- empty, "", new JSONOptions(Map.empty[String, String], "GMT"))
+ empty,
+ new JSONOptions(Map.empty[String, String], "GMT"),
+ CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
}
@@ -1392,7 +1394,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
- emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT"))
+ emptyRecords,
+ new JSONOptions(Map.empty[String, String], "GMT"),
+ CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
}
@@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
assert(df2.schema == schema)
}
+
+ test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType
+ .toDF("value")
+ .write
+ .option("compression", "GzIp")
+ .text(path)
+
+ assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write
+ .option("compression", "gZiP")
+ .json(jsonDir)
+
+ assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz")))
+
+ val originalData = spark.read.json(primitiveFieldAndType)
+ checkAnswer(jsonDF, originalData)
+ checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
+ }
+ }
+
+ test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ primitiveFieldAndType
+ .toDF("value")
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ jsonDF.coalesce(1).write.json(jsonDir)
+
+ val compressedFiles = new File(jsonDir).listFiles()
+ assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+ val originalData = spark.read.json(primitiveFieldAndType)
+ checkAnswer(jsonDF, originalData)
+ checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData)
+ }
+ }
+
+ test("SPARK-18352: Expect one JSON document per file") {
+ // the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token.
+ // this might not be the optimal behavior but this test verifies that only the first value
+ // is parsed and the rest are discarded.
+
+ // alternatively the parser could continue parsing following objects, which may further reduce
+ // allocations by skipping the line reader entirely
+
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .createDataFrame(Seq(Tuple1("{}{invalid}")))
+ .coalesce(1)
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).json(path)
+ // no corrupt record column should be created
+ assert(jsonDF.schema === StructType(Seq()))
+ // only the first object should be read
+ assert(jsonDF.count() === 1)
+ }
+ }
+
+ test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val corruptRecordCount = additionalCorruptRecords.count().toInt
+ assert(corruptRecordCount === 5)
+
+ additionalCorruptRecords
+ .toDF("value")
+ // this is the minimum partition count that avoids hash collisions
+ .repartition(corruptRecordCount * 4, F.hash($"value"))
+ .write
+ .text(path)
+
+ val jsonDF = spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json(path)
+ assert(jsonDF.count() === corruptRecordCount)
+ assert(jsonDF.schema === new StructType()
+ .add("_corrupt_record", StringType)
+ .add("dummy", StringType))
+ val counts = jsonDF
+ .join(
+ additionalCorruptRecords.toDF("value"),
+ F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"),
+ "outer")
+ .agg(
+ F.count($"dummy").as("valid"),
+ F.count($"_corrupt_record").as("corrupt"),
+ F.count("*").as("count"))
+ checkAnswer(counts, Row(1, 4, 6))
+ }
+ }
+
+ test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ val corruptRecordCount = additionalCorruptRecords.count().toInt
+ assert(corruptRecordCount === 5)
+
+ additionalCorruptRecords
+ .toDF("value")
+ // this is the minimum partition count that avoids hash collisions
+ .repartition(corruptRecordCount * 4, F.hash($"value"))
+ .write
+ .text(path)
+
+ val schema = new StructType().add("dummy", StringType)
+
+ // `FAILFAST` mode should throw an exception for corrupt records.
+ val exceptionOne = intercept[SparkException] {
+ spark.read
+ .option("wholeFile", true)
+ .option("mode", "FAILFAST")
+ .json(path)
+ .collect()
+ }
+ assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode"))
+
+ val exceptionTwo = intercept[SparkException] {
+ spark.read
+ .option("wholeFile", true)
+ .option("mode", "FAILFAST")
+ .schema(schema)
+ .json(path)
+ .collect()
+ }
+ assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode"))
+ }
+ }
}