aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-26 17:29:37 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-26 17:29:37 +0200
commit6063d5963fcf01768570c1a9b542be6175a3bcbc (patch)
tree7e3e9843d0765987224ba734803563d0e2441b94
parent341e0e778dff8c404b47d34ee7661b658bb91880 (diff)
downloadspark-6063d5963fcf01768570c1a9b542be6175a3bcbc.tar.gz
spark-6063d5963fcf01768570c1a9b542be6175a3bcbc.tar.bz2
spark-6063d5963fcf01768570c1a9b542be6175a3bcbc.zip
[SPARK-16216][SQL][FOLLOWUP] Enable timestamp type tests for JSON and verify all unsupported types in CSV
## What changes were proposed in this pull request? This PR enables the tests for `TimestampType` for JSON and unifies the logics for verifying schema when writing in CSV. In more details, this PR, - Enables the tests for `TimestampType` for JSON and This was disabled due to an issue in `DatatypeConverter.parseDateTime` which parses dates incorrectly, for example as below: ```scala val d = javax.xml.bind.DatatypeConverter.parseDateTime("0900-01-01T00:00:00.000").getTime println(d.toString) ``` ``` Fri Dec 28 00:00:00 KST 899 ``` However, since we use `FastDateFormat`, it seems we are safe now. ```scala val d = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS").parse("0900-01-01T00:00:00.000") println(d) ``` ``` Tue Jan 01 00:00:00 PST 900 ``` - Verifies all unsupported types in CSV There is a separate logics to verify the schemas in `CSVFileFormat`. This is actually not quite correct enough because we don't support `NullType` and `CalanderIntervalType` as well `StructType`, `ArrayType`, `MapType`. So, this PR adds both types. ## How was this patch tested? Tests in `JsonHadoopFsRelation` and `CSVSuite` Author: hyukjinkwon <gurwls223@gmail.com> Closes #14829 from HyukjinKwon/SPARK-16216-followup.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala4
4 files changed, 26 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1bf57882ce..9a118fe5a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -186,13 +186,18 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
private def verifySchema(schema: StructType): Unit = {
- schema.foreach { field =>
- field.dataType match {
- case _: ArrayType | _: MapType | _: StructType =>
- throw new UnsupportedOperationException(
- s"CSV data source does not support ${field.dataType.simpleString} data type.")
+ def verifyType(dataType: DataType): Unit = dataType match {
+ case ByteType | ShortType | IntegerType | LongType | FloatType |
+ DoubleType | BooleanType | _: DecimalType | TimestampType |
+ DateType | StringType =>
+
+ case udt: UserDefinedType[_] => verifyType(udt.sqlType)
+
case _ =>
- }
+ throw new UnsupportedOperationException(
+ s"CSV data source does not support ${dataType.simpleString} data type.")
}
+
+ schema.foreach(field => verifyType(field.dataType))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index f1b4c11878..1ca6eff1b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -290,6 +290,7 @@ private[csv] object CSVTypeCast {
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime)
}
case _: StringType => UTF8String.fromString(datum)
+ case udt: UserDefinedType[_] => castTo(datum, udt.sqlType, nullable, options)
case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2befad6d72..1930862118 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -27,8 +27,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.{DataFrame, QueryTest, Row, UDT}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -681,6 +680,19 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
Seq((1, Array("Tesla", "Chevy", "Ford"))).toDF("id", "brands").write.csv(csvDir)
}.getMessage
assert(msg.contains("CSV data source does not support array<string> data type"))
+
+ msg = intercept[UnsupportedOperationException] {
+ Seq((1, new UDT.MyDenseVector(Array(0.25, 2.25, 4.25)))).toDF("id", "vectors")
+ .write.csv(csvDir)
+ }.getMessage
+ assert(msg.contains("CSV data source does not support array<double> data type"))
+
+ msg = intercept[SparkException] {
+ val schema = StructType(StructField("a", new UDT.MyDenseVectorUDT(), true) :: Nil)
+ spark.range(1).write.csv(csvDir)
+ spark.read.schema(schema).csv(csvDir).collect()
+ }.getCause.getMessage
+ assert(msg.contains("Unsupported type: array"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
index 52486b122a..d79edee5b1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/JsonHadoopFsRelationSuite.scala
@@ -32,10 +32,6 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override protected def supportsDataType(dataType: DataType): Boolean = dataType match {
case _: NullType => false
case _: BinaryType => false
- // `TimestampType` is disabled because `DatatypeConverter.parseDateTime()`
- // in `DateTimeUtils` parses the formatted string wrongly when the date is
- // too early. (e.g. "1600-07-13T08:36:32.847").
- case _: TimestampType => false
case _: CalendarIntervalType => false
case _ => true
}