aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/spark-submit3
-rw-r--r--core/src/main/java/org/apache/spark/SparkStageInfo.java1
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/SparkStatusTracker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/StatusAPIImpl.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala124
-rw-r--r--pom.xml1
-rw-r--r--project/SparkBuild.scala1
8 files changed, 141 insertions, 1 deletions
diff --git a/bin/spark-submit b/bin/spark-submit
index c557311b4b..f92d90c3a6 100755
--- a/bin/spark-submit
+++ b/bin/spark-submit
@@ -22,6 +22,9 @@
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
ORIG_ARGS=("$@")
+# Set COLUMNS for progress bar
+export COLUMNS=`tput cols`
+
while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java
index 04e2247210..fd74321093 100644
--- a/core/src/main/java/org/apache/spark/SparkStageInfo.java
+++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java
@@ -26,6 +26,7 @@ package org.apache.spark;
public interface SparkStageInfo {
int stageId();
int currentAttemptId();
+ long submissionTime();
String name();
int numTasks();
int numActiveTasks();
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7cccf74003..37013121c5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -50,7 +50,7 @@ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SparkD
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
-import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.{SparkUI, ConsoleProgressBar}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._
@@ -245,6 +245,13 @@ class SparkContext(config: SparkConf) extends Logging {
val statusTracker = new SparkStatusTracker(this)
+ private[spark] val progressBar: Option[ConsoleProgressBar] =
+ if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
+ Some(new ConsoleProgressBar(this))
+ } else {
+ None
+ }
+
// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
@@ -1274,6 +1281,7 @@ class SparkContext(config: SparkConf) extends Logging {
logInfo("Starting job: " + callSite.shortForm)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
+ progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index c18d763d7f..edbdda8a0b 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -96,6 +96,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
new SparkStageInfoImpl(
stageId,
info.attemptId,
+ info.submissionTime.getOrElse(0),
info.name,
info.numTasks,
data.numActiveTasks,
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
index 90b47c847f..e5c7c8d0db 100644
--- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -26,6 +26,7 @@ private class SparkJobInfoImpl (
private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
+ val submissionTime: Long,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
diff --git a/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
new file mode 100644
index 0000000000..27ba9e1823
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/ConsoleProgressBar.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.util.{Timer, TimerTask}
+
+import org.apache.spark._
+
+/**
+ * ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the
+ * status of active stages from `sc.statusTracker` periodically, the progress bar will be showed
+ * up after the stage has ran at least 500ms. If multiple stages run in the same time, the status
+ * of them will be combined together, showed in one line.
+ */
+private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
+
+ // Carrige return
+ val CR = '\r'
+ // Update period of progress bar, in milliseconds
+ val UPDATE_PERIOD = 200L
+ // Delay to show up a progress bar, in milliseconds
+ val FIRST_DELAY = 500L
+
+ // The width of terminal
+ val TerminalWidth = if (!sys.env.getOrElse("COLUMNS", "").isEmpty) {
+ sys.env.get("COLUMNS").get.toInt
+ } else {
+ 80
+ }
+
+ var lastFinishTime = 0L
+ var lastUpdateTime = 0L
+ var lastProgressBar = ""
+
+ // Schedule a refresh thread to run periodically
+ private val timer = new Timer("refresh progress", true)
+ timer.schedule(new TimerTask{
+ override def run() {
+ refresh()
+ }
+ }, FIRST_DELAY, UPDATE_PERIOD)
+
+ /**
+ * Try to refresh the progress bar in every cycle
+ */
+ private def refresh(): Unit = synchronized {
+ val now = System.currentTimeMillis()
+ if (now - lastFinishTime < FIRST_DELAY) {
+ return
+ }
+ val stageIds = sc.statusTracker.getActiveStageIds()
+ val stages = stageIds.map(sc.statusTracker.getStageInfo).flatten.filter(_.numTasks() > 1)
+ .filter(now - _.submissionTime() > FIRST_DELAY).sortBy(_.stageId())
+ if (stages.size > 0) {
+ show(now, stages.take(3)) // display at most 3 stages in same time
+ }
+ }
+
+ /**
+ * Show progress bar in console. The progress bar is displayed in the next line
+ * after your last output, keeps overwriting itself to hold in one line. The logging will follow
+ * the progress bar, then progress bar will be showed in next line without overwrite logs.
+ */
+ private def show(now: Long, stages: Seq[SparkStageInfo]) {
+ val width = TerminalWidth / stages.size
+ val bar = stages.map { s =>
+ val total = s.numTasks()
+ val header = s"[Stage ${s.stageId()}:"
+ val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]"
+ val w = width - header.size - tailer.size
+ val bar = if (w > 0) {
+ val percent = w * s.numCompletedTasks() / total
+ (0 until w).map { i =>
+ if (i < percent) "=" else if (i == percent) ">" else " "
+ }.mkString("")
+ } else {
+ ""
+ }
+ header + bar + tailer
+ }.mkString("")
+
+ // only refresh if it's changed of after 1 minute (or the ssh connection will be closed
+ // after idle some time)
+ if (bar != lastProgressBar || now - lastUpdateTime > 60 * 1000L) {
+ System.err.print(CR + bar)
+ lastUpdateTime = now
+ }
+ lastProgressBar = bar
+ }
+
+ /**
+ * Clear the progress bar if showed.
+ */
+ private def clear() {
+ if (!lastProgressBar.isEmpty) {
+ System.err.printf(CR + " " * TerminalWidth + CR)
+ lastProgressBar = ""
+ }
+ }
+
+ /**
+ * Mark all the stages as finished, clear the progress bar if showed, then the progress will not
+ * interweave with output of jobs.
+ */
+ def finishAll(): Unit = synchronized {
+ clear()
+ lastFinishTime = System.currentTimeMillis()
+ }
+}
diff --git a/pom.xml b/pom.xml
index 41f4ec1844..418c4af8d3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -977,6 +977,7 @@
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
+ <spark.ui.showConsoleProgress>false</spark.ui.showConsoleProgress>
<spark.executor.extraClassPath>${test_classpath}</spark.executor.extraClassPath>
<spark.driver.allowMultipleContexts>true</spark.driver.allowMultipleContexts>
</systemProperties>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 1697b6d4f2..c1879ce4ba 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -377,6 +377,7 @@ object TestSettings {
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.port.maxRetries=100",
javaOptions in Test += "-Dspark.ui.enabled=false",
+ javaOptions in Test += "-Dspark.ui.showConsoleProgress=false",
javaOptions in Test += "-Dspark.driver.allowMultipleContexts=true",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")