aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala6
2 files changed, 7 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 4161792976..08bab4bf27 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -548,7 +548,9 @@ private[spark] object SparkConf extends Logging {
"spark.rpc.askTimeout" -> Seq(
AlternateConfig("spark.akka.askTimeout", "1.4")),
"spark.rpc.lookupTimeout" -> Seq(
- AlternateConfig("spark.akka.lookupTimeout", "1.4"))
+ AlternateConfig("spark.akka.lookupTimeout", "1.4")),
+ "spark.streaming.fileStream.minRememberDuration" -> Seq(
+ AlternateConfig("spark.streaming.minRememberDuration", "1.5"))
)
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index dd4da9d9ca..c358f5b5bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -86,8 +86,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* Files with mod times older than this "window" of remembering will be ignored. So if new
* files are visible within this window, then the file will get selected in the next batch.
*/
- private val minRememberDurationS =
- Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s"))
+ private val minRememberDurationS = {
+ Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.fileStream.minRememberDuration",
+ ssc.conf.get("spark.streaming.minRememberDuration", "60s")))
+ }
// This is a def so that it works during checkpoint recovery:
private def clock = ssc.scheduler.clock