aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorTakeshi Yamamuro <yamamuro@apache.org>2017-02-23 12:09:36 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-23 12:09:36 -0800
commit09ed6e7711d0758c24944516a263b8bd4e1728fc (patch)
tree14decfedc993886ff382f9313f042053dc564f48 /sql/core/src/test
parent9bf4e2baad0e2851da554d85223ffaa029cfa490 (diff)
downloadspark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.gz
spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.tar.bz2
spark-09ed6e7711d0758c24944516a263b8bd4e1728fc.zip
[SPARK-18699][SQL] Put malformed tokens into a new field when parsing CSV data
## What changes were proposed in this pull request? This pr added a logic to put malformed tokens into a new field when parsing CSV data in case of permissive modes. In the current master, if the CSV parser hits these malformed ones, it throws an exception below (and then a job fails); ``` Caused by: java.lang.IllegalArgumentException at java.sql.Date.valueOf(Date.java:143) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:137) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply$mcJ$sp(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$$anonfun$castTo$6.apply(CSVInferSchema.scala:272) at scala.util.Try.getOrElse(Try.scala:79) at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:269) at ``` In case that users load large CSV-formatted data, the job failure makes users get some confused. So, this fix set NULL for original columns and put malformed tokens in a new field. ## How was this patch tested? Added tests in `CSVSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #16928 from maropu/SPARK-18699-2.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/resources/test-data/value-malformed.csv2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala63
2 files changed, 61 insertions, 4 deletions
diff --git a/sql/core/src/test/resources/test-data/value-malformed.csv b/sql/core/src/test/resources/test-data/value-malformed.csv
new file mode 100644
index 0000000000..8945ed73d2
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/value-malformed.csv
@@ -0,0 +1,2 @@
+0,2013-111-11 12:13:14
+1,1983-08-04
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 0c9a7298c3..371d4311ba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row, UDT}
+import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -53,6 +53,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
private val numbersFile = "test-data/numbers.csv"
private val datesFile = "test-data/dates.csv"
private val unescapedQuotesFile = "test-data/unescaped-quotes.csv"
+ private val valueMalformedFile = "test-data/value-malformed.csv"
private def testFile(fileName: String): String = {
Thread.currentThread().getContextClassLoader.getResource(fileName).toString
@@ -700,12 +701,12 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}.getMessage
assert(msg.contains("CSV data source does not support array<double> data type"))
- msg = intercept[SparkException] {
+ msg = intercept[UnsupportedOperationException] {
val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
spark.range(1).write.csv(csvDir)
spark.read.schema(schema).csv(csvDir).collect()
- }.getCause.getMessage
- assert(msg.contains("Unsupported type: array"))
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<double> data type."))
}
}
@@ -958,4 +959,58 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
checkAnswer(df, Row(1, null))
}
}
+
+ test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
+ val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
+ val df1 = spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .schema(schema)
+ .csv(testFile(valueMalformedFile))
+ checkAnswer(df1,
+ Row(null, null) ::
+ Row(1, java.sql.Date.valueOf("1983-08-04")) ::
+ Nil)
+
+ // If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
+ val columnNameOfCorruptRecord = "_unparsed"
+ val schemaWithCorrField1 = schema.add(columnNameOfCorruptRecord, StringType)
+ val df2 = spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ .schema(schemaWithCorrField1)
+ .csv(testFile(valueMalformedFile))
+ checkAnswer(df2,
+ Row(null, null, "0,2013-111-11 12:13:14") ::
+ Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
+ Nil)
+
+ // We put a `columnNameOfCorruptRecord` field in the middle of a schema
+ val schemaWithCorrField2 = new StructType()
+ .add("a", IntegerType)
+ .add(columnNameOfCorruptRecord, StringType)
+ .add("b", TimestampType)
+ val df3 = spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ .schema(schemaWithCorrField2)
+ .csv(testFile(valueMalformedFile))
+ checkAnswer(df3,
+ Row(null, "0,2013-111-11 12:13:14", null) ::
+ Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
+ Nil)
+
+ val errMsg = intercept[AnalysisException] {
+ spark
+ .read
+ .option("mode", "PERMISSIVE")
+ .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
+ .schema(schema.add(columnNameOfCorruptRecord, IntegerType))
+ .csv(testFile(valueMalformedFile))
+ .collect
+ }.getMessage
+ assert(errMsg.startsWith("The field for corrupt records must be string type and nullable"))
+ }
}