aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala16
3 files changed, 26 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1bf57882ce..9a118fe5a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -186,13 +186,18 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
private def verifySchema(schema: StructType): Unit = {
- schema.foreach { field =>
- field.dataType match {
- case _: ArrayType | _: MapType | _: StructType =>
- throw new UnsupportedOperationException(
- s"CSV data source does not support ${field.dataType.simpleString} data type.")
+ def verifyType(dataType: DataType): Unit = dataType match {
+ case ByteType | ShortType | IntegerType | LongType | FloatType |
+ DoubleType | BooleanType | _: DecimalType | TimestampType |
+ DateType | StringType =>
+
+ case udt: UserDefinedType[_] => verifyType(udt.sqlType)
+
case _ =>
- }
+ throw new UnsupportedOperationException(
+ s"CSV data source does not support ${dataType.simpleString} data type.")
}
+
+ schema.foreach(field => verifyType(field.dataType))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index f1b4c11878..1ca6eff1b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -290,6 +290,7 @@ private[csv] object CSVTypeCast {
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
}
case _: StringType => UTF8String.fromString(datum)
+ case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2befad6d72..1930862118 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -27,8 +27,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, UDT}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -681,6 +680,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support array<string> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors")
+ .write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<double> data type"))
+
+ msg = intercept[SparkException] {
+ val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
+ spark.range(1).write.csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getCause.getMessage
+ assert(msg.contains("Unsupported type: array"))
}
}