diff options
author | Takuya UESHIN <ueshin@databricks.com> | 2017-03-14 13:57:23 -0700 |
---|---|---|
committer | Xiao Li <gatorsmile@gmail.com> | 2017-03-14 13:57:23 -0700 |
commit | 7ded39c223429265b23940ca8244660dbee8320c (patch) | |
tree | e52bc19910347af47ecf24deaae2431866d043b4 | |
parent | 6eac96823c7b244773bd810812b369e336a65837 (diff) | |
download | spark-7ded39c223429265b23940ca8244660dbee8320c.tar.gz spark-7ded39c223429265b23940ca8244660dbee8320c.tar.bz2 spark-7ded39c223429265b23940ca8244660dbee8320c.zip |
[SPARK-19817][SQL] Make it clear that `timeZone` option is a general option in DataFrameReader/Writer.
## What changes were proposed in this pull request?
As timezone setting can also affect partition values, it works for all formats, we should make it clear.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes #17281 from ueshin/issues/SPARK-19817.
17 files changed, 101 insertions, 48 deletions
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 4354345ebc..705803791d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -109,6 +109,11 @@ class DataFrameReader(OptionUtils): @since(1.5) def option(self, key, value): """Adds an input option for the underlying data source. + + You can set the following option(s) for reading files: + * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps + in the JSON/CSV datasources or parttion values. + If it isn't set, it uses the default value, session local timezone. """ self._jreader = self._jreader.option(key, to_str(value)) return self @@ -116,6 +121,11 @@ class DataFrameReader(OptionUtils): @since(1.4) def options(self, **options): """Adds input options for the underlying data source. + + You can set the following option(s) for reading files: + * ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps + in the JSON/CSV datasources or parttion values. + If it isn't set, it uses the default value, session local timezone. """ for k in options: self._jreader = self._jreader.option(k, to_str(options[k])) @@ -159,7 +169,7 @@ class DataFrameReader(OptionUtils): allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, - timeZone=None, wholeFile=None): + wholeFile=None): """ Loads JSON files and returns the results as a :class:`DataFrame`. @@ -214,8 +224,6 @@ class DataFrameReader(OptionUtils): formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. - :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. - If None is set, it uses the default value, session local timezone. :param wholeFile: parse one record, which may span multiple lines, per file. If None is set, it uses the default value, ``false``. @@ -234,7 +242,7 @@ class DataFrameReader(OptionUtils): allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat, - timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile) + timestampFormat=timestampFormat, wholeFile=wholeFile) if isinstance(path, basestring): path = [path] if type(path) == list: @@ -307,7 +315,7 @@ class DataFrameReader(OptionUtils): comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, - maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, timeZone=None, + maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, wholeFile=None): """Loads a CSV file and returns the result as a :class:`DataFrame`. @@ -367,8 +375,6 @@ class DataFrameReader(OptionUtils): uses the default value, ``10``. :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. - If None is set, it uses the default value, session local timezone. * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ record, and puts the malformed string into a field configured by \ @@ -399,7 +405,7 @@ class DataFrameReader(OptionUtils): nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, timeZone=timeZone, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile) if isinstance(path, basestring): path = [path] @@ -521,6 +527,11 @@ class DataFrameWriter(OptionUtils): @since(1.5) def option(self, key, value): """Adds an output option for the underlying data source. + + You can set the following option(s) for writing files: + * ``timeZone``: sets the string that indicates a timezone to be used to format + timestamps in the JSON/CSV datasources or parttion values. + If it isn't set, it uses the default value, session local timezone. """ self._jwrite = self._jwrite.option(key, to_str(value)) return self @@ -528,6 +539,11 @@ class DataFrameWriter(OptionUtils): @since(1.4) def options(self, **options): """Adds output options for the underlying data source. + + You can set the following option(s) for writing files: + * ``timeZone``: sets the string that indicates a timezone to be used to format + timestamps in the JSON/CSV datasources or parttion values. + If it isn't set, it uses the default value, session local timezone. """ for k in options: self._jwrite = self._jwrite.option(k, to_str(options[k])) @@ -619,8 +635,7 @@ class DataFrameWriter(OptionUtils): self._jwrite.saveAsTable(name) @since(1.4) - def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None, - timeZone=None): + def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None): """Saves the content of the :class:`DataFrame` in JSON format at the specified path. :param path: the path in any Hadoop supported file system @@ -641,15 +656,12 @@ class DataFrameWriter(OptionUtils): formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. - :param timeZone: sets the string that indicates a timezone to be used to format timestamps. - If None is set, it uses the default value, session local timezone. >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts( - compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat, - timeZone=timeZone) + compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.json(path) @since(1.4) @@ -696,7 +708,7 @@ class DataFrameWriter(OptionUtils): @since(2.0) def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, - timestampFormat=None, timeZone=None): + timestampFormat=None): """Saves the content of the :class:`DataFrame` in CSV format at the specified path. :param path: the path in any Hadoop supported file system @@ -736,15 +748,13 @@ class DataFrameWriter(OptionUtils): formats follow the formats at ``java.text.SimpleDateFormat``. This applies to timestamp type. If None is set, it uses the default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``. - :param timeZone: sets the string that indicates a timezone to be used to parse timestamps. - If None is set, it uses the default value, session local timezone. >>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) """ self.mode(mode) self._set_opts(compression=compression, sep=sep, quote=quote, escape=escape, header=header, nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll, - dateFormat=dateFormat, timestampFormat=timestampFormat, timeZone=timeZone) + dateFormat=dateFormat, timestampFormat=timestampFormat) self._jwrite.csv(path) @since(1.5) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index e3631b0c07..b862deaf36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -113,7 +113,8 @@ case class CatalogTablePartition( */ def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) - val timeZoneId = caseInsensitiveProperties.getOrElse("timeZone", defaultTimeZondId) + val timeZoneId = caseInsensitiveProperties.getOrElse( + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) InternalRow.fromSeq(partitionSchema.map { field => Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval() }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5a91f9c193..5f222ec602 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} +import org.apache.spark.sql.catalyst.util._ /** * Options for parsing JSON data into Spark SQL rows. @@ -69,7 +69,8 @@ private[sql] class JSONOptions( val columnNameOfCorruptRecord = parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord) - val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + val timeZone: TimeZone = TimeZone.getTimeZone( + parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 9e1de0fd2f..9b94c1e2b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -60,6 +60,8 @@ object DateTimeUtils { final val TimeZoneGMT = TimeZone.getTimeZone("GMT") final val MonthOf31Days = Set(1, 3, 5, 7, 8, 10, 12) + val TIMEZONE_OPTION = "timeZone" + def defaultTimeZone(): TimeZone = TimeZone.getDefault() // Reuse the Calendar object in each thread as it is expensive to create in each method call. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala index e3584909dd..19d0c8eb92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala @@ -471,7 +471,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( JsonToStruct( schema, - Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> tz.getID), + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> tz.getID), Literal(jsonData2), gmtId), InternalRow(c.getTimeInMillis * 1000L) @@ -523,14 +524,16 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation( StructToJson( - Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> gmtId.get), + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> gmtId.get), struct, gmtId), """{"t":"2016-01-01T00:00:00"}""" ) checkEvaluation( StructToJson( - Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", "timeZone" -> "PST"), + Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss", + DateTimeUtils.TIMEZONE_OPTION -> "PST"), struct, gmtId), """{"t":"2015-12-31T16:00:00"}""" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4f4cc93117..f1bce1aa41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -70,6 +70,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds an input option for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def option(key: String, value: String): DataFrameReader = { @@ -101,6 +107,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * (Scala-specific) Adds input options for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def options(options: scala.collection.Map[String, String]): DataFrameReader = { @@ -111,6 +123,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { /** * Adds input options for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to parse timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameReader = { @@ -305,8 +323,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> - * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone - * to be used to parse timestamps.</li> * <li>`wholeFile` (default `false`): parse one record, which may span multiple lines, * per file</li> * </ul> @@ -478,8 +494,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> - * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone - * to be used to parse timestamps.</li> * <li>`maxColumns` (default `20480`): defines a hard limit of how many columns * a record can have.</li> * <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 49e85dc7b1..608160a214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -90,6 +90,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds an output option for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def option(key: String, value: String): DataFrameWriter[T] = { @@ -121,6 +127,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * (Scala-specific) Adds output options for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def options(options: scala.collection.Map[String, String]): DataFrameWriter[T] = { @@ -131,6 +143,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { /** * Adds output options for the underlying data source. * + * You can set the following option(s): + * <ul> + * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone + * to be used to format timestamps in the JSON/CSV datasources or parttion values.</li> + * </ul> + * * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameWriter[T] = { @@ -457,8 +475,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> - * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone - * to be used to format timestamps.</li> * </ul> * * @since 1.4.0 @@ -565,8 +581,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that * indicates a timestamp format. Custom date formats follow the formats at * `java.text.SimpleDateFormat`. This applies to timestamp type.</li> - * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone - * to be used to format timestamps.</li> * </ul> * * @since 2.0.0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index aa578f4d23..769deb1890 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -105,7 +105,7 @@ case class OptimizeMetadataOnlyQuery( val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) - val timeZoneId = caseInsensitiveProperties.get("timeZone") + val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(conf.sessionLocalTimeZone) val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 30a09a9ad3..ce33298aeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -141,7 +141,7 @@ object FileFormatWriter extends Logging { customPartitionLocations = outputSpec.customPartitionLocations, maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), - timeZoneId = caseInsensitiveOptions.get("timeZone") + timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index c8097a7fab..a5fa8b3f93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -127,7 +127,7 @@ abstract class PartitioningAwareFileIndex( }.keys.toSeq val caseInsensitiveOptions = CaseInsensitiveMap(parameters) - val timeZoneId = caseInsensitiveOptions.get("timeZone") + val timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone) userPartitionSchema match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 0b1e5dac2d..2632e87971 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -24,7 +24,7 @@ import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, Unescape import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes} +import org.apache.spark.sql.catalyst.util._ class CSVOptions( @transient private val parameters: CaseInsensitiveMap[String], @@ -120,7 +120,8 @@ class CSVOptions( name.map(CompressionCodecs.getCodecClassName) } - val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId)) + val timeZone: TimeZone = TimeZone.getTimeZone( + parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) // Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe. val dateFormat: FastDateFormat = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 4435e4df38..95dfdf5b29 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, UDT} +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions.{col, regexp_replace} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -912,7 +913,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .format("csv") .option("header", "true") .option("timestampFormat", "yyyy/MM/dd HH:mm") - .option("timeZone", "GMT") + .option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .save(timestampsWithFormatPath) // This will load back the timestamps as string. @@ -934,7 +935,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .option("header", "true") .option("inferSchema", "true") .option("timestampFormat", "yyyy/MM/dd HH:mm") - .option("timeZone", "GMT") + .option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .load(timestampsWithFormatPath) checkAnswer(readBack, timestampsWithFormat) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 0aaf148dac..9b0efcbdaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1767,7 +1767,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { timestampsWithFormat.write .format("json") .option("timestampFormat", "yyyy/MM/dd HH:mm") - .option("timeZone", "GMT") + .option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .save(timestampsWithFormatPath) // This will load back the timestamps as string. @@ -1785,7 +1785,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val readBack = spark.read .schema(customSchema) .option("timestampFormat", "yyyy/MM/dd HH:mm") - .option("timeZone", "GMT") + .option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .json(timestampsWithFormatPath) checkAnswer(readBack, timestampsWithFormat) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 88cb8a0bad..2b20b9716b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.functions._ @@ -708,10 +709,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } withTempPath { dir => - df.write.option("timeZone", "GMT") + df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) val fields = schema.map(f => Column(f.name).cast(f.dataType)) - checkAnswer(spark.read.option("timeZone", "GMT").load(dir.toString).select(fields: _*), row) + checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") + .load(dir.toString).select(fields: _*), row) } } @@ -749,10 +751,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } withTempPath { dir => - df.write.option("timeZone", "GMT") + df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") .format("parquet").partitionBy(partitionColumns.map(_.name): _*).save(dir.toString) val fields = schema.map(f => Column(f.name)) - checkAnswer(spark.read.option("timeZone", "GMT").load(dir.toString).select(fields: _*), row) + checkAnswer(spark.read.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") + .load(dir.toString).select(fields: _*), row) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index f251290583..a2f3afe3ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -142,7 +143,8 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { checkPartitionValues(files.head, "2016-12-01 00:00:00") } withTempPath { f => - df.write.option("timeZone", "GMT").partitionBy("ts").parquet(f.getAbsolutePath) + df.write.option(DateTimeUtils.TIMEZONE_OPTION, "GMT") + .partitionBy("ts").parquet(f.getAbsolutePath) val files = recursiveList(f).filter(_.getAbsolutePath.endsWith("parquet")) assert(files.length == 1) // use timeZone option "GMT" to format partition value. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 9b5e364e51..0f97fd78d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -27,7 +27,8 @@ class ResolvedDataSourceSuite extends SparkFunSuite { DataSource( sparkSession = null, className = name, - options = Map("timeZone" -> DateTimeUtils.defaultTimeZone().getID)).providingClass + options = Map(DateTimeUtils.TIMEZONE_OPTION -> DateTimeUtils.defaultTimeZone().getID) + ).providingClass test("jdbc") { assert( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 33802ae623..8860b7dc07 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.hive.client.HiveClient |