From 7e5359be5ca038fdb579712b18e7f226d705c276 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 28 Feb 2017 13:34:33 -0800 Subject: [SPARK-19610][SQL] Support parsing multiline CSV files ## What changes were proposed in this pull request? This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file). So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory. ## How was this patch tested? Unit tests in `CSVSuite` and `tests.py` Manual tests with a single 9GB CSV file in local file system, for example, ```scala spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count() ``` Author: hyukjinkwon Closes #16976 from HyukjinKwon/SPARK-19610. --- .../sql/execution/datasources/csv/CSVSuite.scala | 192 ++++++++++++++------- 1 file changed, 132 insertions(+), 60 deletions(-) (limited to 'sql/core/src/test') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 371d4311ba..d94eb66201 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -24,11 +24,12 @@ import java.text.SimpleDateFormat import java.util.Locale import org.apache.commons.lang3.time.FastDateFormat -import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec +import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT} +import org.apache.spark.sql.functions.{col, regexp_replace} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.types._ @@ -243,12 +244,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("test for DROPMALFORMED parsing mode") { - val cars = spark.read - .format("csv") - .options(Map("header" -> "true", "mode" -> "dropmalformed")) - .load(testFile(carsFile)) + Seq(false, true).foreach { wholeFile => + val cars = spark.read + .format("csv") + .option("wholeFile", wholeFile) + .options(Map("header" -> "true", "mode" -> "dropmalformed")) + .load(testFile(carsFile)) - assert(cars.select("year").collect().size === 2) + assert(cars.select("year").collect().size === 2) + } } test("test for blank column names on read and select columns") { @@ -263,14 +267,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("test for FAILFAST parsing mode") { - val exception = intercept[SparkException]{ - spark.read - .format("csv") - .options(Map("header" -> "true", "mode" -> "failfast")) - .load(testFile(carsFile)).collect() - } + Seq(false, true).foreach { wholeFile => + val exception = intercept[SparkException] { + spark.read + .format("csv") + .option("wholeFile", wholeFile) + .options(Map("header" -> "true", "mode" -> "failfast")) + .load(testFile(carsFile)).collect() + } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) + assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) + } } test("test for tokens more than the fields in the schema") { @@ -961,56 +968,121 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") { - val schema = new StructType().add("a", IntegerType).add("b", TimestampType) - val df1 = spark - .read - .option("mode", "PERMISSIVE") - .schema(schema) - .csv(testFile(valueMalformedFile)) - checkAnswer(df1, - Row(null, null) :: - Row(1, java.sql.Date.valueOf("1983-08-04")) :: - Nil) - - // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records - val columnNameOfCorruptRecord = "_unparsed" - val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) - val df2 = spark - .read - .option("mode", "PERMISSIVE") - .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) - .schema(schemaWithCorrField1) - .csv(testFile(valueMalformedFile)) - checkAnswer(df2, - Row(null, null, "0,2013-111-11 12:13:14") :: - Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: - Nil) - - // We put a `columnNameOfCorruptRecord` field in the middle of a schema - val schemaWithCorrField2 = new StructType() - .add("a", IntegerType) - .add(columnNameOfCorruptRecord, StringType) - .add("b", TimestampType) - val df3 = spark - .read - .option("mode", "PERMISSIVE") - .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) - .schema(schemaWithCorrField2) - .csv(testFile(valueMalformedFile)) - checkAnswer(df3, - Row(null, "0,2013-111-11 12:13:14", null) :: - Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: - Nil) - - val errMsg = intercept[AnalysisException] { - spark + Seq(false, true).foreach { wholeFile => + val schema = new StructType().add("a", IntegerType).add("b", TimestampType) + val df1 = spark + .read + .option("mode", "PERMISSIVE") + .option("wholeFile", wholeFile) + .schema(schema) + .csv(testFile(valueMalformedFile)) + checkAnswer(df1, + Row(null, null) :: + Row(1, java.sql.Date.valueOf("1983-08-04")) :: + Nil) + + // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records + val columnNameOfCorruptRecord = "_unparsed" + val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType) + val df2 = spark .read .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) - .schema(schema.add(columnNameOfCorruptRecord, IntegerType)) + .option("wholeFile", wholeFile) + .schema(schemaWithCorrField1) .csv(testFile(valueMalformedFile)) - .collect - }.getMessage - assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + checkAnswer(df2, + Row(null, null, "0,2013-111-11 12:13:14") :: + Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: + Nil) + + // We put a `columnNameOfCorruptRecord` field in the middle of a schema + val schemaWithCorrField2 = new StructType() + .add("a", IntegerType) + .add(columnNameOfCorruptRecord, StringType) + .add("b", TimestampType) + val df3 = spark + .read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .option("wholeFile", wholeFile) + .schema(schemaWithCorrField2) + .csv(testFile(valueMalformedFile)) + checkAnswer(df3, + Row(null, "0,2013-111-11 12:13:14", null) :: + Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: + Nil) + + val errMsg = intercept[AnalysisException] { + spark + .read + .option("mode", "PERMISSIVE") + .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) + .option("wholeFile", wholeFile) + .schema(schema.add(columnNameOfCorruptRecord, IntegerType)) + .csv(testFile(valueMalformedFile)) + .collect + }.getMessage + assert(errMsg.startsWith("The field for corrupt records must be string type and nullable")) + } + } + + test("SPARK-19610: Parse normal multi-line CSV files") { + val primitiveFieldAndType = Seq( + """" + |string","integer + | + | + |","long + | + |","bigInteger",double,boolean,null""".stripMargin, + """"this is a + |simple + |string."," + | + |10"," + |21474836470","92233720368547758070"," + | + |1.7976931348623157E308",true,""".stripMargin) + + withTempPath { path => + primitiveFieldAndType.toDF("value").coalesce(1).write.text(path.getAbsolutePath) + + val df = spark.read + .option("header", true) + .option("wholeFile", true) + .csv(path.getAbsolutePath) + + // Check if headers have new lines in the names. + val actualFields = df.schema.fieldNames.toSeq + val expectedFields = + Seq("\nstring", "integer\n\n\n", "long\n\n", "bigInteger", "double", "boolean", "null") + assert(actualFields === expectedFields) + + // Check if the rows have new lines in the values. + val expected = Row( + "this is a\nsimple\nstring.", + "\n\n10", + "\n21474836470", + "92233720368547758070", + "\n\n1.7976931348623157E308", + "true", + null) + checkAnswer(df, expected) + } + } + + test("Empty file produces empty dataframe with empty schema - wholeFile option") { + withTempPath { path => + path.createNewFile() + + val df = spark.read.format("csv") + .option("header", true) + .option("wholeFile", true) + .load(path.getAbsolutePath) + + assert(df.schema === spark.emptyDataFrame.schema) + checkAnswer(df, spark.emptyDataFrame) + } } } -- cgit v1.2.3