aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala23
1 files changed, 12 insertions, 11 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
index 2719e1ee98..3ae80ecfd2 100644
--- a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -30,22 +30,23 @@ import org.apache.spark.internal.Logging
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
// Carriage return
- val CR = '\r'
+ private val CR = '\r'
// Update period of progress bar, in milliseconds
- val UPDATE_PERIOD = 200L
+ private val updatePeriodMSec =
+ sc.getConf.getTimeAsMs("spark.ui.consoleProgress.update.interval", "200")
// Delay to show up a progress bar, in milliseconds
- val FIRST_DELAY = 500L
+ private val firstDelayMSec = 500L
// The width of terminal
- val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
+ private val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
sys.env.get("COLUMNS").get.toInt
} else {
80
}
- var lastFinishTime = 0L
- var lastUpdateTime = 0L
- var lastProgressBar = ""
+ private var lastFinishTime = 0L
+ private var lastUpdateTime = 0L
+ private var lastProgressBar = ""
// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh progress", true)
@@ -53,19 +54,19 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
override def run() {
refresh()
}
- }, FIRST_DELAY, UPDATE_PERIOD)
+ }, firstDelayMSec, updatePeriodMSec)
/**
* Try to refresh the progress bar in every cycle
*/
private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
- if (now - lastFinishTime < FIRST_DELAY) {
+ if (now - lastFinishTime < firstDelayMSec) {
return
}
val stageIds = sc.statusTracker.getActiveStageIds()
val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1)
- .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
+ .filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time
}
@@ -94,7 +95,7 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
header + bar + tailer
}.mkString("")
- // only refresh if it's changed of after 1 minute (or the ssh connection will be closed
+ // only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
// after idle some time)
if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
System.err.print(CR + bar)