diff options
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala index 2719e1ee98..3ae80ecfd2 100644 --- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala +++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala @@ -30,22 +30,23 @@ import org.apache.spark.internal.Logging */ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { // Carriage return - val CR = '\r' + private val CR = '\r' // Update period of progress bar, in milliseconds - val UPDATE_PERIOD = 200L + private val updatePeriodMSec = + sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200") // Delay to show up a progress bar, in milliseconds - val FIRST_DELAY = 500L + private val firstDelayMSec = 500L // The width of terminal - val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) { + private val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) { sys.env.get("COLUMNS").get.toInt } else { 80 } - var lastFinishTime = 0L - var lastUpdateTime = 0L - var lastProgressBar = "" + private var lastFinishTime = 0L + private var lastUpdateTime = 0L + private var lastProgressBar = "" // Schedule a refresh thread to run periodically private val timer = new Timer("refresh progress", true) @@ -53,19 +54,19 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { override def run() { refresh() } - }, FIRST_DELAY, UPDATE_PERIOD) + }, firstDelayMSec, updatePeriodMSec) /** * Try to refresh the progress bar in every cycle */ private def refresh(): Unit = synchronized { val now = System.currentTimeMillis() - if (now - lastFinishTime < FIRST_DELAY) { + if (now - lastFinishTime < firstDelayMSec) { return } val stageIds = sc.statusTracker.getActiveStageIds() val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1) - .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId()) + .filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId()) if (stages.length > 0) { show(now, stages.take(3)) // display at most 3 stages in same time } @@ -94,7 +95,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging { header + bar + tailer }.mkString("") - // only refresh if it's changed of after 1 minute (or the ssh connection will be closed + // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed // after idle some time) if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) { System.err.print(CR + bar) |