diff options
author | Rohit Agarwal <rohita@qubole.com> | 2015-08-17 10:31:57 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-08-17 10:31:57 -0700 |
commit | ed092a06c28dfa8204b473a5c964a9ef9a6b655e (patch) | |
tree | 7c7a47e3c6ef72b4aaa65524dd22af0ec850d530 | |
parent | 76c155dd4483d58499e5cb66e5e9373bb771dbeb (diff) | |
download | spark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.tar.gz spark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.tar.bz2 spark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.zip |
[SPARK-9924] [WEB UI] Don't schedule checkForLogs while some of them are already running.
Author: Rohit Agarwal <rohita@qubole.com>
Closes #8153 from mindprince/SPARK-9924.
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 53c18ca3ff..e573ff16c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Disable the background thread during tests. if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. - pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. - pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } } } @@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) mod1 >= mod2 } - logInfos.sliding(20, 20).foreach { batch => - replayExecutor.submit(new Runnable { - override def run(): Unit = mergeApplicationListing(batch) - }) - } + logInfos.grouped(20) + .map { batch => + replayExecutor.submit(new Runnable { + override def run(): Unit = mergeApplicationListing(batch) + }) + } + .foreach { task => + try { + // Wait for all tasks to finish. This makes sure that checkForLogs + // is not scheduled again while some tasks are already running in + // the replayExecutor. + task.get() + } catch { + case e: InterruptedException => + throw e + case e: Exception => + logError("Exception while merging application listings", e) + } + } lastModifiedTime = newLastModifiedTime } catch { |