aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRohit Agarwal <rohita@qubole.com>2015-08-17 10:31:57 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-17 10:31:57 -0700
commited092a06c28dfa8204b473a5c964a9ef9a6b655e (patch)
tree7c7a47e3c6ef72b4aaa65524dd22af0ec850d530
parent76c155dd4483d58499e5cb66e5e9373bb771dbeb (diff)
downloadspark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.tar.gz
spark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.tar.bz2
spark-ed092a06c28dfa8204b473a5c964a9ef9a6b655e.zip
[SPARK-9924] [WEB UI] Don't schedule checkForLogs while some of them are already running.
Author: Rohit Agarwal <rohita@qubole.com> Closes #8153 from mindprince/SPARK-9924.
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala28
1 files changed, 21 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 53c18ca3ff..e573ff16c5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Disable the background thread during tests.
if (!conf.contains("spark.testing")) {
// A task that periodically checks for event log updates on disk.
- pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
+ pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
// A task that periodically cleans event logs on disk.
- pool.scheduleAtFixedRate(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
+ pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
}
}
@@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
mod1 >= mod2
}
- logInfos.sliding(20, 20).foreach { batch =>
- replayExecutor.submit(new Runnable {
- override def run(): Unit = mergeApplicationListing(batch)
- })
- }
+ logInfos.grouped(20)
+ .map { batch =>
+ replayExecutor.submit(new Runnable {
+ override def run(): Unit = mergeApplicationListing(batch)
+ })
+ }
+ .foreach { task =>
+ try {
+ // Wait for all tasks to finish. This makes sure that checkForLogs
+ // is not scheduled again while some tasks are already running in
+ // the replayExecutor.
+ task.get()
+ } catch {
+ case e: InterruptedException =>
+ throw e
+ case e: Exception =>
+ logError("Exception while merging application listings", e)
+ }
+ }
lastModifiedTime = newLastModifiedTime
} catch {