aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-02-28 13:34:33 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-28 13:34:33 -0800
commit7e5359be5ca038fdb579712b18e7f226d705c276 (patch)
tree6fca55568b53c2ded63bcbf846a8463ffcafc92a /sql/core/src/test
parentce233f18e381fa1ea00be74ca26e97d35baa6c9c (diff)
downloadspark-7e5359be5ca038fdb579712b18e7f226d705c276.tar.gz
spark-7e5359be5ca038fdb579712b18e7f226d705c276.tar.bz2
spark-7e5359be5ca038fdb579712b18e7f226d705c276.zip
[SPARK-19610][SQL] Support parsing multiline CSV files
## What changes were proposed in this pull request? This PR proposes the support for multiple lines for CSV by resembling the multiline supports in JSON datasource (in case of JSON, per file). So, this PR introduces `wholeFile` option which makes the format not splittable and reads each whole file. Since Univocity parser can produces each row from a stream, it should be capable of parsing very large documents when the internal rows are fix in the memory. ## How was this patch tested? Unit tests in `CSVSuite` and `tests.py` Manual tests with a single 9GB CSV file in local file system, for example, ```scala spark.read.option("wholeFile", true).option("inferSchema", true).csv("tmp.csv").count() ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #16976 from HyukjinKwon/SPARK-19610.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala192
1 files changed, 132 insertions, 60 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 371d4311ba..d94eb66201 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -24,11 +24,12 @@ import java.text.SimpleDateFormat
import java.util.Locale
import org.apache.commons.lang3.time.FastDateFormat
-import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT}
+import org.apache.spark.sql.functions.{col, regexp_replace}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -243,12 +244,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
test("test for DROPMALFORMED parsing mode") {
- val cars = spark.read
- .format("csv")
- .options(Map("header" -> "true", "mode" -> "dropmalformed"))
- .load(testFile(carsFile))
+ Seq(false, true).foreach { wholeFile =>
+ val cars = spark.read
+ .format("csv")
+ .option("wholeFile", wholeFile)
+ .options(Map("header" -> "true", "mode" -> "dropmalformed"))
+ .load(testFile(carsFile))
- assert(cars.select("year").collect().size === 2)
+ assert(cars.select("year").collect().size === 2)
+ }
}
test("test for blank column names on read and select columns") {
@@ -263,14 +267,17 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
test("test for FAILFAST parsing mode") {
- val exception = intercept[SparkException]{
- spark.read
- .format("csv")
- .options(Map("header" -> "true", "mode" -> "failfast"))
- .load(testFile(carsFile)).collect()
- }
+ Seq(false, true).foreach { wholeFile =>
+ val exception = intercept[SparkException] {
+ spark.read
+ .format("csv")
+ .option("wholeFile", wholeFile)
+ .options(Map("header" -> "true", "mode" -> "failfast"))
+ .load(testFile(carsFile)).collect()
+ }
- assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt"))
+ assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt"))
+ }
}
test("test for tokens more than the fields in the schema") {
@@ -961,56 +968,121 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
- val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
- val df1 = spark
- .read
- .option("mode", "PERMISSIVE")
- .schema(schema)
- .csv(testFile(valueMalformedFile))
- checkAnswer(df1,
- Row(null, null) ::
- Row(1, java.sql.Date.valueOf("1983-08-04")) ::
- Nil)
-
- // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
- val columnNameOfCorruptRecord = "_unparsed"
- val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
- val df2 = spark
- .read
- .option("mode", "PERMISSIVE")
- .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
- .schema(schemaWithCorrField1)
- .csv(testFile(valueMalformedFile))
- checkAnswer(df2,
- Row(null, null, "0,2013-111-11 12:13:14") ::
- Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
- Nil)
-
- // We put a `columnNameOfCorruptRecord` field in the middle of a schema
- val schemaWithCorrField2 = new StructType()
- .add("a", IntegerType)
- .add(columnNameOfCorruptRecord, StringType)
- .add("b", TimestampType)
- val df3 = spark
- .read
- .option("mode", "PERMISSIVE")
- .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
- .schema(schemaWithCorrField2)
- .csv(testFile(valueMalformedFile))
- checkAnswer(df3,
- Row(null, "0,2013-111-11 12:13:14", null) ::
- Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
- Nil)
-
- val errMsg = intercept[AnalysisException] {
- spark
+ Seq(false, true).foreach { wholeFile =>
+ val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
+ val df1 = spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("wholeFile", wholeFile)
+ .schema(schema)
+ .csv(testFile(valueMalformedFile))
+ checkAnswer(df1,
+ Row(null, null) ::
+ Row(1, java.sql.Date.valueOf("1983-08-04")) ::
+ Nil)
+
+ // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
+ val columnNameOfCorruptRecord = "_unparsed"
+ val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
+ val df2 = spark
.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
- .schema(schema.add(columnNameOfCorruptRecord, IntegerType))
+ .option("wholeFile", wholeFile)
+ .schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))
- .collect
- }.getMessage
- assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
+ checkAnswer(df2,
+ Row(null, null, "0,2013-111-11 12:13:14") ::
+ Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
+ Nil)
+
+ // We put a `columnNameOfCorruptRecord` field in the middle of a schema
+ val schemaWithCorrField2 = new StructType()
+ .add("a", IntegerType)
+ .add(columnNameOfCorruptRecord, StringType)
+ .add("b", TimestampType)
+ val df3 = spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ .option("wholeFile", wholeFile)
+ .schema(schemaWithCorrField2)
+ .csv(testFile(valueMalformedFile))
+ checkAnswer(df3,
+ Row(null, "0,2013-111-11 12:13:14", null) ::
+ Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
+ Nil)
+
+ val errMsg = intercept[AnalysisException] {
+ spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ .option("wholeFile", wholeFile)
+ .schema(schema.add(columnNameOfCorruptRecord, IntegerType))
+ .csv(testFile(valueMalformedFile))
+ .collect
+ }.getMessage
+ assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
+ }
+ }
+
+ test("SPARK-19610: Parse normal multi-line CSV files") {
+ val primitiveFieldAndType = Seq(
+ """"
+ |string","integer
+ |
+ |
+ |","long
+ |
+ |","bigInteger",double,boolean,null""".stripMargin,
+ """"this is a
+ |simple
+ |string.","
+ |
+ |10","
+ |21474836470","92233720368547758070","
+ |
+ |1.7976931348623157E308",true,""".stripMargin)
+
+ withTempPath { path =>
+ primitiveFieldAndType.toDF("value").coalesce(1).write.text(path.getAbsolutePath)
+
+ val df = spark.read
+ .option("header", true)
+ .option("wholeFile", true)
+ .csv(path.getAbsolutePath)
+
+ // Check if headers have new lines in the names.
+ val actualFields = df.schema.fieldNames.toSeq
+ val expectedFields =
+ Seq("\nstring", "integer\n\n\n", "long\n\n", "bigInteger", "double", "boolean", "null")
+ assert(actualFields === expectedFields)
+
+ // Check if the rows have new lines in the values.
+ val expected = Row(
+ "this is a\nsimple\nstring.",
+ "\n\n10",
+ "\n21474836470",
+ "92233720368547758070",
+ "\n\n1.7976931348623157E308",
+ "true",
+ null)
+ checkAnswer(df, expected)
+ }
+ }
+
+ test("Empty file produces empty dataframe with empty schema - wholeFile option") {
+ withTempPath { path =>
+ path.createNewFile()
+
+ val df = spark.read.format("csv")
+ .option("header", true)
+ .option("wholeFile", true)
+ .load(path.getAbsolutePath)
+
+ assert(df.schema === spark.emptyDataFrame.schema)
+ checkAnswer(df, spark.emptyDataFrame)
+ }
}
}