aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorsureshthalamati <suresh.thalamati@gmail.com>2016-05-23 17:15:19 -0700
committerWenchen Fan <wenchen@databricks.com>2016-05-23 17:15:19 -0700
commit03c7b7c4b9374f0cb6a29aeaf495bd21c2563de4 (patch)
treeb9f6341429b87d3f0a10a18e9001a3d55ac07372 /sql
parent37c617e4f580482b59e1abbe3c0c27c7125cf605 (diff)
downloadspark-03c7b7c4b9374f0cb6a29aeaf495bd21c2563de4.tar.gz
spark-03c7b7c4b9374f0cb6a29aeaf495bd21c2563de4.tar.bz2
spark-03c7b7c4b9374f0cb6a29aeaf495bd21c2563de4.zip
[SPARK-15315][SQL] Adding error check to the CSV datasource writer for unsupported complex data types.
## What changes were proposed in this pull request? Adds error handling to the CSV writer for unsupported complex data types. Currently garbage gets written to the output csv files if the data frame schema has complex data types. ## How was this patch tested? Added new unit test case. Author: sureshthalamati <suresh.thalamati@gmail.com> Closes #13105 from sureshthalamati/csv_complex_types_SPARK-15315.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala22
2 files changed, 35 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index f47ed76cba..057bde1a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
/**
@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
+ verifySchema(dataSchema)
val conf = job.getConfiguration
val csvOptions = new CSVOptions(options)
csvOptions.compressionCodec.foreach { codec =>
@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with DataSourceRegister {
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
}
}
+
+ private def verifySchema(schema: StructType): Unit = {
+ schema.foreach { field =>
+ field.dataType match {
+ case _: ArrayType | _: MapType | _: StructType =>
+ throw new UnsupportedOperationException(
+ s"CSV data source does not support ${field.dataType.simpleString} data type.")
+ case _ =>
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 27d6dc9197..bae290776f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+ import testImplicits._
+
private val carsFile = "cars.csv"
private val carsMalformedFile = "cars-malformed.csv"
private val carsFile8859 = "cars_iso-8859-1.csv"
@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(numbers.count() == 8)
}
+
+ test("error handling for unsupported data types.") {
+ withTempDir { dir =>
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ var msg = intercept[UnsupportedOperationException] {
+ Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support map<string,int> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<string> data type"))
+ }
+ }
}