diff options
Diffstat (limited to 'sql/core/src/test')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala | 152 |
1 files changed, 147 insertions, 5 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9344aeda00..05aa2ab2ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -28,8 +28,8 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.rdd.RDD import org.apache.spark.SparkException -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions} +import org.apache.spark.sql.{functions => F, _} +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType @@ -64,7 +64,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val dummyOption = new JSONOptions(Map.empty[String, String], "GMT") val dummySchema = StructType(Seq.empty) - val parser = new JacksonParser(dummySchema, "", dummyOption) + val parser = new JacksonParser(dummySchema, dummyOption) Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser => jsonParser.nextToken() @@ -1367,7 +1367,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-6245 JsonRDD.inferSchema on empty RDD") { // This is really a test that it doesn't throw an exception val emptySchema = JsonInferSchema.infer( - empty, "", new JSONOptions(Map.empty[String, String], "GMT")) + empty, + new JSONOptions(Map.empty[String, String], "GMT"), + CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) } @@ -1392,7 +1394,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-8093 Erase empty structs") { val emptySchema = JsonInferSchema.infer( - emptyRecords, "", new JSONOptions(Map.empty[String, String], "GMT")) + emptyRecords, + new JSONOptions(Map.empty[String, String], "GMT"), + CreateJacksonParser.string) assert(StructType(Seq()) === emptySchema) } @@ -1802,4 +1806,142 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { + withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType + .toDF("value") + .write + .option("compression", "GzIp") + .text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write + .option("compression", "gZiP") + .json(jsonDir) + + assert(new File(jsonDir).listFiles().exists(_.getName.endsWith(".json.gz"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) + } + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { + withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType + .toDF("value") + .write + .text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write.json(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val originalData = spark.read.json(primitiveFieldAndType) + checkAnswer(jsonDF, originalData) + checkAnswer(spark.read.schema(originalData.schema).json(jsonDir), originalData) + } + } + + test("SPARK-18352: Expect one JSON document per file") { + // the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. + // this might not be the optimal behavior but this test verifies that only the first value + // is parsed and the rest are discarded. + + // alternatively the parser could continue parsing following objects, which may further reduce + // allocations by skipping the line reader entirely + + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .createDataFrame(Seq(Tuple1("{}{invalid}"))) + .coalesce(1) + .write + .text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) + } + } + + test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") { + withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords + .toDF("value") + // this is the minimum partition count that avoids hash collisions + .repartition(corruptRecordCount * 4, F.hash($"value")) + .write + .text(path) + + val jsonDF = spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() + .add("_corrupt_record", StringType) + .add("dummy", StringType)) + val counts = jsonDF + .join( + additionalCorruptRecords.toDF("value"), + F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === F.trim($"value"), + "outer") + .agg( + F.count($"dummy").as("valid"), + F.count($"_corrupt_record").as("corrupt"), + F.count("*").as("count")) + checkAnswer(counts, Row(1, 4, 6)) + } + } + + test("SPARK-18352: Handle multi-line corrupt documents (FAILFAST)") { + withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords + .toDF("value") + // this is the minimum partition count that avoids hash collisions + .repartition(corruptRecordCount * 4, F.hash($"value")) + .write + .text(path) + + val schema = new StructType().add("dummy", StringType) + + // `FAILFAST` mode should throw an exception for corrupt records. + val exceptionOne = intercept[SparkException] { + spark.read + .option("wholeFile", true) + .option("mode", "FAILFAST") + .json(path) + .collect() + } + assert(exceptionOne.getMessage.contains("Malformed line in FAILFAST mode")) + + val exceptionTwo = intercept[SparkException] { + spark.read + .option("wholeFile", true) + .option("mode", "FAILFAST") + .schema(schema) + .json(path) + .collect() + } + assert(exceptionTwo.getMessage.contains("Malformed line in FAILFAST mode")) + } + } } |