aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2016-09-20 22:36:24 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-09-20 22:36:24 -0700
commite48ebc4e403ca3a0e580b47aadffe9fbfcf3c655 (patch)
tree5a2861951926c7e2be2b339403b134eab112b104
parent1ea49916acc46b0a74e5c85eef907920c5e31142 (diff)
downloadspark-e48ebc4e403ca3a0e580b47aadffe9fbfcf3c655.tar.gz
spark-e48ebc4e403ca3a0e580b47aadffe9fbfcf3c655.tar.bz2
spark-e48ebc4e403ca3a0e580b47aadffe9fbfcf3c655.zip
[SPARK-15698][SQL][STREAMING][FOLLW-UP] Fix FileStream source and sink log get configuration issue
## What changes were proposed in this pull request? This issue was introduced in the previous commit of SPARK-15698. Mistakenly change the way to get configuration back to original one, so here with the follow up PR to revert them up. ## How was this patch tested? N/A Ping zsxwing , please review again, sorry to bring the inconvenience. Thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #15173 from jerryshao/SPARK-15698-follow.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala8
3 files changed, 13 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 64f2f00484..f9e24167a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -84,14 +84,11 @@ class FileStreamSinkLog(
private implicit val formats = Serialization.formats(NoTypeHints)
- protected override val fileCleanupDelayMs =
- sparkSession.conf.get(SQLConf.FILE_SINK_LOG_CLEANUP_DELAY)
+ protected override val fileCleanupDelayMs = sparkSession.sessionState.conf.fileSinkLogCleanupDelay
- protected override val isDeletingExpiredLog =
- sparkSession.conf.get(SQLConf.FILE_SINK_LOG_DELETION)
+ protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSinkLogDeletion
- protected override val compactInterval =
- sparkSession.conf.get(SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL)
+ protected override val compactInterval = sparkSession.sessionState.conf.fileSinkLogCompactInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $compactInterval) " +
"to a positive value.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
index 8103309aff..4681f2ba08 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceLog.scala
@@ -39,16 +39,15 @@ class FileStreamSourceLog(
// Configurations about metadata compaction
protected override val compactInterval =
- sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
+ sparkSession.sessionState.conf.fileSourceLogCompactInterval
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
s"positive value.")
protected override val fileCleanupDelayMs =
- sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)
+ sparkSession.sessionState.conf.fileSourceLogCleanupDelay
- protected override val isDeletingExpiredLog =
- sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)
+ protected override val isDeletingExpiredLog = sparkSession.sessionState.conf.fileSourceLogDeletion
private implicit val formats = Serialization.formats(NoTypeHints)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f8b7a7f8ef..e67140fefe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -620,10 +620,16 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
- def fileSinkLogCompatInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
+ def fileSinkLogCompactInterval: Int = getConf(FILE_SINK_LOG_COMPACT_INTERVAL)
def fileSinkLogCleanupDelay: Long = getConf(FILE_SINK_LOG_CLEANUP_DELAY)
+ def fileSourceLogDeletion: Boolean = getConf(FILE_SOURCE_LOG_DELETION)
+
+ def fileSourceLogCompactInterval: Int = getConf(FILE_SOURCE_LOG_COMPACT_INTERVAL)
+
+ def fileSourceLogCleanupDelay: Long = getConf(FILE_SOURCE_LOG_CLEANUP_DELAY)
+
def streamingSchemaInference: Boolean = getConf(STREAMING_SCHEMA_INFERENCE)
def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY)