aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-08-24 22:16:20 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-08-24 22:16:20 +0200
commit29952ed096fd2a0a19079933ff691671d6f00835 (patch)
tree88846c8853b674b098e0c85ee1c462a8b99fb93a /sql/core/src/test/scala
parent891ac2b914fb6f90a62c6fbc0a3960a89d1c1d92 (diff)
downloadspark-29952ed096fd2a0a19079933ff691671d6f00835.tar.gz
spark-29952ed096fd2a0a19079933ff691671d6f00835.tar.bz2
spark-29952ed096fd2a0a19079933ff691671d6f00835.zip
[SPARK-16216][SQL] Read/write timestamps and dates in ISO 8601 and dateFormat/timestampFormat option for CSV and JSON
## What changes were proposed in this pull request? ### Default - ISO 8601 Currently, CSV datasource is writing `Timestamp` and `Date` as numeric form and JSON datasource is writing both as below: - CSV ``` // TimestampType 1414459800000000 // DateType 16673 ``` - Json ``` // TimestampType 1970-01-01 11:46:40.0 // DateType 1970-01-01 ``` So, for CSV we can't read back what we write and for JSON it becomes ambiguous because the timezone is being missed. So, this PR make both **write** `Timestamp` and `Date` in ISO 8601 formatted string (please refer the [ISO 8601 specification](https://www.w3.org/TR/NOTE-datetime)). - For `Timestamp` it becomes as below: (`yyyy-MM-dd'T'HH:mm:ss.SSSZZ`) ``` 1970-01-01T02:00:01.000-01:00 ``` - For `Date` it becomes as below (`yyyy-MM-dd`) ``` 1970-01-01 ``` ### Custom date format option - `dateFormat` This PR also adds the support to write and read dates and timestamps in a formatted string as below: - **DateType** - With `dateFormat` option (e.g. `yyyy/MM/dd`) ``` +----------+ | date| +----------+ |2015/08/26| |2014/10/27| |2016/01/28| +----------+ ``` ### Custom date format option - `timestampFormat` - **TimestampType** - With `dateFormat` option (e.g. `dd/MM/yyyy HH:mm`) ``` +----------------+ | date| +----------------+ |2015/08/26 18:00| |2014/10/27 18:30| |2016/01/28 20:00| +----------------+ ``` ## How was this patch tested? Unit tests were added in `CSVSuite` and `JsonSuite`. For JSON, existing tests cover the default cases. Author: hyukjinkwon <gurwls223@gmail.com> Closes #14279 from HyukjinKwon/SPARK-16216-json-csv.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala157
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala17
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala67
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala6
5 files changed, 236 insertions, 15 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index dbe3af49c9..5e00f669b8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -60,9 +60,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
}
test("Timestamp field types are inferred correctly via custom data format") {
- var options = new CSVOptions(Map("dateFormat" -> "yyyy-mm"))
+ var options = new CSVOptions(Map("timestampFormat" -> "yyyy-mm"))
assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
- options = new CSVOptions(Map("dateFormat" -> "yyyy"))
+ options = new CSVOptions(Map("timestampFormat" -> "yyyy"))
assert(CSVInferSchema.inferField(TimestampType, "2015", options) == TimestampType)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 8cd76ddf20..2befad6d72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -22,11 +22,13 @@ import java.nio.charset.UnsupportedCharsetException
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
+import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._
@@ -477,7 +479,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
val options = Map(
"header" -> "true",
"inferSchema" -> "true",
- "dateFormat" -> "dd/MM/yyyy hh:mm")
+ "timestampFormat" -> "dd/MM/yyyy HH:mm")
val results = spark.read
.format("csv")
.options(options)
@@ -485,7 +487,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
.select("date")
.collect()
- val dateFormat = new SimpleDateFormat("dd/MM/yyyy hh:mm")
+ val dateFormat = new SimpleDateFormat("dd/MM/yyyy HH:mm")
val expected =
Seq(Seq(new Timestamp(dateFormat.parse("26/08/2015 18:00").getTime)),
Seq(new Timestamp(dateFormat.parse("27/10/2014 18:30").getTime)),
@@ -691,4 +693,155 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(cars, withHeader = true, checkValues = false)
}
+
+ test("Write timestamps correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("inferSchema", "true")
+ .option("header", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ // This will load back the timestamps as string.
+ val iso8601Timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601timestampsPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
+ val expectedTimestamps = timestamps.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601Timestamps, expectedTimestamps)
+ }
+ }
+
+ test("Write dates correctly in ISO8601 format by default") {
+ withTempDir { dir =>
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ val iso8601datesPath = s"${dir.getCanonicalPath}/iso8601dates.csv"
+ val dates = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ dates.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601datesPath)
+
+ // This will load back the dates as string.
+ val iso8601dates = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(iso8601datesPath)
+
+ val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd")
+ val expectedDates = dates.collect().map { r =>
+ // This should be ISO8601 formatted string.
+ Row(iso8501.format(r.toSeq.head))
+ }
+
+ checkAnswer(iso8601dates, expectedDates)
+ }
+ }
+
+ test("Roundtrip in reading and writing timestamps") {
+ withTempDir { dir =>
+ val iso8601timestampsPath = s"${dir.getCanonicalPath}/iso8601timestamps.csv"
+ val timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(testFile(datesFile))
+
+ timestamps.write
+ .format("csv")
+ .option("header", "true")
+ .save(iso8601timestampsPath)
+
+ val iso8601timestamps = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .load(iso8601timestampsPath)
+
+ checkAnswer(iso8601timestamps, timestamps)
+ }
+ }
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.csv"
+ val datesWithFormat = spark.read
+ .format("csv")
+ .schema(customSchema)
+ .option("header", "true")
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ datesWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringDatesWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.csv"
+ val timestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "true")
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .load(testFile(datesFile))
+ timestampsWithFormat.write
+ .format("csv")
+ .option("header", "true")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringTimestampsWithFormat = spark.read
+ .format("csv")
+ .option("header", "true")
+ .option("inferSchema", "false")
+ .load(timestampsWithFormatPath)
+ val expectedStringTimestampsWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringTimestampsWithFormat)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
index 26b33b24ef..3ce643e667 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala
@@ -96,13 +96,18 @@ class CSVTypeCastSuite extends SparkFunSuite {
assert(CSVTypeCast.castTo("1.00", DoubleType) == 1.0)
assert(CSVTypeCast.castTo("true", BooleanType) == true)
- val options = CSVOptions("dateFormat", "dd/MM/yyyy hh:mm")
+ val timestampsOptions = CSVOptions("timestampFormat", "dd/MM/yyyy hh:mm")
val customTimestamp = "31/01/2015 00:00"
- val expectedTime = options.dateFormat.parse("31/01/2015 00:00").getTime
- assert(CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, options) ==
- expectedTime * 1000L)
- assert(CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, options) ==
- DateTimeUtils.millisToDays(expectedTime))
+ val expectedTime = timestampsOptions.timestampFormat.parse(customTimestamp).getTime
+ val castedTimestamp =
+ CSVTypeCast.castTo(customTimestamp, TimestampType, nullable = true, timestampsOptions)
+ assert(castedTimestamp == expectedTime * 1000L)
+
+ val customDate = "31/01/2015"
+ val dateOptions = CSVOptions("dateFormat", "dd/MM/yyyy")
+ val expectedDate = dateOptions.dateFormat.parse(customDate).getTime
+ val castedDate = CSVTypeCast.castTo(customTimestamp, DateType, nullable = true, dateOptions)
+ assert(castedDate == DateTimeUtils.millisToDays(expectedDate))
val timestamp = "2015-01-01 00:00:00"
assert(CSVTypeCast.castTo(timestamp, TimestampType) ==
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 342fd3e82e..63a9061210 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -101,15 +101,15 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType))
val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
+ val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
enforceCorrectType(ISO8601Time1, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(3601000),
- enforceCorrectType(ISO8601Time1, DateType))
- val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)),
enforceCorrectType(ISO8601Time2, TimestampType))
- checkTypePromotion(DateTimeUtils.millisToDays(10801000),
- enforceCorrectType(ISO8601Time2, DateType))
+
+ val ISO8601Date = "1970-01-01"
+ checkTypePromotion(DateTimeUtils.millisToDays(32400000),
+ enforceCorrectType(ISO8601Date, DateType))
}
test("Get compatible type") {
@@ -1664,4 +1664,61 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(df.schema.size === 2)
df.collect()
}
+
+ test("Write dates correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", DateType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val datesWithFormatPath = s"${dir.getCanonicalPath}/datesWithFormat.json"
+ val datesWithFormat = spark.read
+ .schema(customSchema)
+ .option("dateFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+
+ datesWithFormat.write
+ .format("json")
+ .option("dateFormat", "yyyy/MM/dd")
+ .save(datesWithFormatPath)
+
+ // This will load back the dates as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringDatesWithFormat = spark.read
+ .schema(stringSchema)
+ .json(datesWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26"),
+ Row("2014/10/27"),
+ Row("2016/01/28"))
+
+ checkAnswer(stringDatesWithFormat, expectedStringDatesWithFormat)
+ }
+ }
+
+ test("Write timestamps correctly with dateFormat option") {
+ val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
+ withTempDir { dir =>
+ // With dateFormat option.
+ val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+ val timestampsWithFormat = spark.read
+ .schema(customSchema)
+ .option("timestampFormat", "dd/MM/yyyy HH:mm")
+ .json(datesRecords)
+ timestampsWithFormat.write
+ .format("json")
+ .option("timestampFormat", "yyyy/MM/dd HH:mm")
+ .save(timestampsWithFormatPath)
+
+ // This will load back the timestamps as string.
+ val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
+ val stringTimestampsWithFormat = spark.read
+ .schema(stringSchema)
+ .json(timestampsWithFormatPath)
+ val expectedStringDatesWithFormat = Seq(
+ Row("2015/08/26 18:00"),
+ Row("2014/10/27 18:30"),
+ Row("2016/01/28 20:00"))
+
+ checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index f4a3336643..a400940db9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -222,6 +222,12 @@ private[json] trait TestJsonData {
spark.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
+ def datesRecords: RDD[String] =
+ spark.sparkContext.parallelize(
+ """{"date": "26/08/2015 18:00"}""" ::
+ """{"date": "27/10/2014 18:30"}""" ::
+ """{"date": "28/01/2016 20:00"}""" :: Nil)
+
lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())