aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-08-08 06:22:37 +0100
committerSean Owen <sowen@cloudera.com>2016-08-08 06:22:37 +0100
commite076fb05ac83a3ed6995e29bb03ea07ea05e39db (patch)
treed2c6f150bbc53bb7068162ac9c1dfb2eb98d1050
parenta16983c97b4c6539f97e5d26f163fed49872df2b (diff)
downloadspark-e076fb05ac83a3ed6995e29bb03ea07ea05e39db.tar.gz
spark-e076fb05ac83a3ed6995e29bb03ea07ea05e39db.tar.bz2
spark-e076fb05ac83a3ed6995e29bb03ea07ea05e39db.zip
[SPARK-16919] Configurable update interval for console progress bar
## What changes were proposed in this pull request? Currently the update interval for the console progress bar is hardcoded. This PR makes it configurable for users. ## How was this patch tested? Ran a long running job and with a high value of update interval, the updates were shown less frequently. Author: Tejas Patil <tejasp@fb.com> Closes #14507 from tejasapatil/SPARK-16919.
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala23
1 files changed, 12 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index 2719e1ee98..3ae80ecfd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -30,22 +30,23 @@ import org.apache.spark.internal.Logging
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
// Carriage return
- val CR = '\r'
+ private val CR = '\r'
// Update period of progress bar, in milliseconds
- val UPDATE_PERIOD = 200L
+ private val updatePeriodMSec =
+ sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
// Delay to show up a progress bar, in milliseconds
- val FIRST_DELAY = 500L
+ private val firstDelayMSec = 500L
// The width of terminal
- val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
+ private val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
sys.env.get("COLUMNS").get.toInt
} else {
80
}
- var lastFinishTime = 0L
- var lastUpdateTime = 0L
- var lastProgressBar = ""
+ private var lastFinishTime = 0L
+ private var lastUpdateTime = 0L
+ private var lastProgressBar = ""
// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh progress", true)
@@ -53,19 +54,19 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
override def run() {
refresh()
}
- }, FIRST_DELAY, UPDATE_PERIOD)
+ }, firstDelayMSec, updatePeriodMSec)
/**
* Try to refresh the progress bar in every cycle
*/
private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
- if (now - lastFinishTime < FIRST_DELAY) {
+ if (now - lastFinishTime < firstDelayMSec) {
return
}
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
- .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
+ .filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
@@ -94,7 +95,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
header + bar + tailer
}.mkString("")
- // only refresh if it's changed of after 1 minute (or the ssh connection will be closed
+ // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
// after idle some time)
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
System.err.print(CR + bar)