aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/main/scala/org/apache')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala4
9 files changed, 39 insertions, 25 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1830839aee..780fe51ac6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -27,6 +27,7 @@ import org.apache.spark.Partition
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
@@ -298,6 +299,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* </ul>
*
* @since 2.0.0
@@ -329,7 +332,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def json(jsonRDD: RDD[String]): DataFrame = {
- val parsedOptions: JSONOptions = new JSONOptions(extraOptions.toMap)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(extraOptions.toMap, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -401,6 +405,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 748ebba3e8..1d834b1821 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -456,6 +456,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.</li>
* </ul>
*
* @since 1.4.0
@@ -562,6 +564,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to format timestamps.</li>
* </ul>
*
* @since 2.0.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b80ff48bb..e62cd9f7bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.json.JacksonGenerator
+import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans._
@@ -2678,10 +2678,12 @@ class Dataset[T] private[sql](
*/
def toJSON: Dataset[String] = {
val rowSchema = this.schema
+ val sessionLocalTimeZone = sparkSession.sessionState.conf.sessionLocalTimeZone
val rdd: RDD[String] = queryExecution.toRdd.mapPartitions { iter =>
val writer = new CharArrayWriter()
// create the Generator without separator inserted between 2 records
- val gen = new JacksonGenerator(rowSchema, writer)
+ val gen = new JacksonGenerator(rowSchema, writer,
+ new JSONOptions(Map.empty[String, String], sessionLocalTimeZone))
new Iterator[String] {
override def hasNext: Boolean = iter.hasNext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index 1d2bf07047..566f40f454 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -29,7 +29,7 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.CompressionCodecs
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.sources._
@@ -55,7 +55,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val paths = files.map(_.getPath.toString)
val lines: Dataset[String] = createBaseDataset(sparkSession, csvOptions, paths)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -69,7 +69,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
dataSchema: StructType): OutputWriterFactory = {
CSVUtils.verifySchema(dataSchema)
val conf = job.getConfiguration
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
csvOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -96,7 +96,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
- val csvOptions = new CSVOptions(options)
+ val csvOptions = new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 9d79ea6ed1..b7fbaa4f44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv
import java.nio.charset.StandardCharsets
-import java.util.Locale
+import java.util.{Locale, TimeZone}
import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}
import org.apache.commons.lang3.time.FastDateFormat
@@ -26,10 +26,12 @@ import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
-private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap[String])
+private[csv] class CSVOptions(
+ @transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
extends Logging with Serializable {
- def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ def this(parameters: Map[String, String], defaultTimeZoneId: String) =
+ this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
@@ -106,13 +108,15 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
name.map(CompressionCodecs.getCodecClassName)
}
+ val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
+
// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), Locale.US)
val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
- parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), Locale.US)
+ parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
val maxColumns = getInt("maxColumns", 20480)
@@ -161,12 +165,3 @@ private[csv] class CSVOptions(@transient private val parameters: CaseInsensitive
settings
}
}
-
-object CSVOptions {
-
- def apply(): CSVOptions = new CSVOptions(CaseInsensitiveMap(Map.empty))
-
- def apply(paramName: String, paramValue: String): CSVOptions = {
- new CSVOptions(Map(paramName -> paramValue))
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
index ee79138c0f..4082a0df8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityGenerator.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types._
private[csv] class UnivocityGenerator(
schema: StructType,
writer: Writer,
- options: CSVOptions = new CSVOptions(Map.empty[String, String])) {
+ options: CSVOptions) {
private val writerSettings = options.asWriterSettings
writerSettings.setHeaders(schema.fieldNames: _*)
private val gen = new CsvWriter(writer, writerSettings)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index 3b42aa60b0..2e409b3f5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -76,7 +76,7 @@ private[csv] class UnivocityParser(
name: String,
dataType: DataType,
nullable: Boolean = true,
- options: CSVOptions = CSVOptions()): ValueConverter = dataType match {
+ options: CSVOptions): ValueConverter = dataType match {
case _: ByteType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(_.toByte)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 98ab9d2850..b4a8ff2cf0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -47,7 +47,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
if (files.isEmpty) {
None
} else {
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
@@ -67,7 +68,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
parsedOptions.compressionCodec.foreach { codec =>
CompressionCodecs.setCodecConfiguration(conf, codec)
}
@@ -97,7 +99,8 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
- val parsedOptions: JSONOptions = new JSONOptions(options)
+ val parsedOptions: JSONOptions =
+ new JSONOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone)
val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index b7ffb3cddb..4e706da184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -181,6 +181,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* </ul>
*
* @since 2.0.0
@@ -230,6 +232,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSZZ`): sets the string that
* indicates a timestamp format. Custom date formats follow the formats at
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
+ * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
+ * to be used to parse timestamps.</li>
* <li>`maxColumns` (default `20480`): defines a hard limit of how many columns
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed