aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala14
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala22
2 files changed, 35 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index f47ed76cba..057bde1a75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
/**
@@ -86,6 +86,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
+ verifySchema(dataSchema)
val conf = job.getConfiguration
val csvOptions = new CSVOptions(options)
csvOptions.compressionCodec.foreach { codec =>
@@ -172,4 +173,15 @@ class DefaultSource extends FileFormat with DataSourceRegister {
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
}
}
+
+ private def verifySchema(schema: StructType): Unit = {
+ schema.foreach { field =>
+ field.dataType match {
+ case _: ArrayType | _: MapType | _: StructType =>
+ throw new UnsupportedOperationException(
+ s"CSV data source does not support ${field.dataType.simpleString} data type.")
+ case _ =>
+ }
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 27d6dc9197..bae290776f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -31,6 +31,8 @@ import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
+ import testImplicits._
+
private val carsFile = "cars.csv"
private val carsMalformedFile = "cars-malformed.csv"
private val carsFile8859 = "cars_iso-8859-1.csv"
@@ -582,4 +584,24 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
assert(numbers.count() == 8)
}
+
+ test("error handling for unsupported data types.") {
+ withTempDir { dir =>
+ val csvDir = new File(dir, "csv").getCanonicalPath
+ var msg = intercept[UnsupportedOperationException] {
+ Seq((1, "Tesla")).toDF("a", "b").selectExpr("struct(a, b)").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support struct<a:int,b:string> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Map("Tesla" -> 3))).toDF("id", "cars").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support map<string,int> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<string> data type"))
+ }
+ }
}