aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-04 13:26:18 -0700
committerAndrew Or <andrew@databricks.com>2016-04-04 13:26:18 -0700
commit7143904700435265975d36f073cce2833467e121 (patch)
tree1e8778c194afa169572d7a1cd947bfbc62105b3f
parent27dad6f658f04815e1f3b93c68974bfd31500bed (diff)
downloadspark-7143904700435265975d36f073cce2833467e121.tar.gz
spark-7143904700435265975d36f073cce2833467e121.tar.bz2
spark-7143904700435265975d36f073cce2833467e121.zip
[SPARK-14358] Change SparkListener from a trait to an abstract class
## What changes were proposed in this pull request? Scala traits are difficult to maintain binary compatibility on, and as a result we had to introduce JavaSparkListener. In Spark 2.0 we can change SparkListener from a trait to an abstract class and then remove JavaSparkListener. ## How was this patch tested? Updated related unit tests. Author: Reynold Xin <rxin@databricks.com> Closes #12142 from rxin/SPARK-14358.
-rw-r--r--core/src/main/java/org/apache/spark/JavaSparkListener.java88
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java2
-rw-r--r--core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala251
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala199
-rw-r--r--project/MimaExcludes.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala2
7 files changed, 276 insertions, 279 deletions
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
deleted file mode 100644
index 23bc9a2e81..0000000000
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark;
-
-import org.apache.spark.scheduler.*;
-
-/**
- * Java clients should extend this class instead of implementing
- * SparkListener directly. This is to prevent java clients
- * from breaking when new events are added to the SparkListener
- * trait.
- *
- * This is a concrete class instead of abstract to enforce
- * new events get added to both the SparkListener and this adapter
- * in lockstep.
- */
-public class JavaSparkListener implements SparkListener {
-
- @Override
- public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }
-
- @Override
- public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }
-
- @Override
- public void onTaskStart(SparkListenerTaskStart taskStart) { }
-
- @Override
- public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }
-
- @Override
- public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }
-
- @Override
- public void onJobStart(SparkListenerJobStart jobStart) { }
-
- @Override
- public void onJobEnd(SparkListenerJobEnd jobEnd) { }
-
- @Override
- public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }
-
- @Override
- public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }
-
- @Override
- public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }
-
- @Override
- public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }
-
- @Override
- public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }
-
- @Override
- public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }
-
- @Override
- public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }
-
- @Override
- public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }
-
- @Override
- public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
-
- @Override
- public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
-
- @Override
- public void onOtherEvent(SparkListenerEvent event) { }
-
-}
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index e6b24afd88..97eed611e8 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -28,7 +28,7 @@ import org.apache.spark.scheduler.*;
* this was a concrete Scala class, default implementations of new event handlers would be inherited
* from the SparkListener trait).
*/
-public class SparkFirehoseListener implements SparkListener {
+public class SparkFirehoseListener implements SparkListenerInterface {
public void onEvent(SparkListenerEvent event) { }
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 61f689ec8c..2bdbd3fae9 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -56,7 +56,7 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
* Lives in the driver to receive heartbeats from executors..
*/
private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
- extends ThreadSafeRpcEndpoint with SparkListener with Logging {
+ extends SparkListener with ThreadSafeRpcEndpoint with Logging {
def this(sc: SparkContext) {
this(sc, new SystemClock)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 586173f180..080ea6c33a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -151,275 +151,152 @@ private[spark] trait SparkHistoryListenerFactory {
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}
+
/**
- * :: DeveloperApi ::
- * Interface for listening to events from the Spark scheduler. Note that this is an internal
- * interface which might change in different Spark releases. Java clients should extend
- * {@link JavaSparkListener}
+ * Interface for listening to events from the Spark scheduler. Most applications should probably
+ * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
+ *
+ * Note that this is an internal interface which might change in different Spark releases.
*/
-@DeveloperApi
-trait SparkListener {
+private[spark] trait SparkListenerInterface {
+
/**
* Called when a stage completes successfully or fails, with information on the completed stage.
*/
- def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }
+ def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
/**
* Called when a stage is submitted
*/
- def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit
/**
* Called when a task starts
*/
- def onTaskStart(taskStart: SparkListenerTaskStart) { }
+ def onTaskStart(taskStart: SparkListenerTaskStart): Unit
/**
* Called when a task begins remotely fetching its result (will not be called for tasks that do
* not need to fetch the result remotely).
*/
- def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) { }
+ def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit
/**
* Called when a task ends
*/
- def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
+ def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit
/**
* Called when a job starts
*/
- def onJobStart(jobStart: SparkListenerJobStart) { }
+ def onJobStart(jobStart: SparkListenerJobStart): Unit
/**
* Called when a job ends
*/
- def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+ def onJobEnd(jobEnd: SparkListenerJobEnd): Unit
/**
* Called when environment properties have been updated
*/
- def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
+ def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit
/**
* Called when a new block manager has joined
*/
- def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
+ def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit
/**
* Called when an existing block manager has been removed
*/
- def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
+ def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit
/**
* Called when an RDD is manually unpersisted by the application
*/
- def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
+ def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit
/**
* Called when the application starts
*/
- def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
+ def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit
/**
* Called when the application ends
*/
- def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
+ def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit
/**
* Called when the driver receives task metrics from an executor in a heartbeat.
*/
- def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) { }
+ def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
/**
* Called when the driver registers a new executor.
*/
- def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) { }
+ def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
/**
* Called when the driver removes an executor.
*/
- def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved) { }
+ def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit
/**
* Called when the driver receives a block update info.
*/
- def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+ def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit
/**
* Called when other events like SQL-specific events are posted.
*/
- def onOtherEvent(event: SparkListenerEvent) { }
+ def onOtherEvent(event: SparkListenerEvent): Unit
}
+
/**
* :: DeveloperApi ::
- * Simple SparkListener that logs a few summary statistics when each stage completes
+ * A default implementation for [[SparkListenerInterface]] that has no-op implementations for
+ * all callbacks.
+ *
+ * Note that this is an internal interface which might change in different Spark releases.
*/
@DeveloperApi
-class StatsReportListener extends SparkListener with Logging {
-
- import org.apache.spark.scheduler.StatsReportListener._
-
- private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val info = taskEnd.taskInfo
- val metrics = taskEnd.taskMetrics
- if (info != null && metrics != null) {
- taskInfoMetrics += ((info, metrics))
- }
- }
-
- override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
- implicit val sc = stageCompleted
- this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
- showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
-
- // Shuffle write
- showBytesDistribution("shuffle bytes written:",
- (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
-
- // Fetch & I/O
- showMillisDistribution("fetch wait time:",
- (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
- showBytesDistribution("remote bytes read:",
- (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
- showBytesDistribution("task result size:",
- (_, metric) => Some(metric.resultSize), taskInfoMetrics)
-
- // Runtime breakdown
- val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
- RuntimePercentage(info.duration, metrics)
- }
- showDistribution("executor (non-fetch) time pct: ",
- Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
- showDistribution("fetch wait time pct: ",
- Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
- showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
- taskInfoMetrics.clear()
- }
-
- private def getStatusDetail(info: StageInfo): String = {
- val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
- val timeTaken = info.submissionTime.map(
- x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
- ).getOrElse("-")
-
- s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
- s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
- s"Took: $timeTaken msec"
- }
+abstract class SparkListener extends SparkListenerInterface {
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { }
-}
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { }
-private[spark] object StatsReportListener extends Logging {
-
- // For profiling, the extremes are more interesting
- val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
- val probabilities = percentiles.map(_ / 100.0)
- val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
-
- def extractDoubleDistribution(
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
- getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
- Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
- }
-
- // Is there some way to setup the types that I can get rid of this completely?
- def extractLongDistribution(
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
- getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
- extractDoubleDistribution(
- taskInfoMetrics,
- (info, metric) => { getMetric(info, metric).map(_.toDouble) })
- }
-
- def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
- val stats = d.statCounter
- val quantiles = d.getQuantiles(probabilities).map(formatNumber)
- logInfo(heading + stats)
- logInfo(percentilesHeader)
- logInfo("\t" + quantiles.mkString("\t"))
- }
-
- def showDistribution(
- heading: String,
- dOpt: Option[Distribution],
- formatNumber: Double => String) {
- dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
- }
-
- def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
- def f(d: Double): String = format.format(d)
- showDistribution(heading, dOpt, f _)
- }
-
- def showDistribution(
- heading: String,
- format: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Double],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
- showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
- }
-
- def showBytesDistribution(
- heading: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Long],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
- showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
- }
-
- def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
- dOpt.foreach { dist => showBytesDistribution(heading, dist) }
- }
-
- def showBytesDistribution(heading: String, dist: Distribution) {
- showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
- }
-
- def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
- showDistribution(heading, dOpt,
- (d => StatsReportListener.millisToString(d.toLong)): Double => String)
- }
-
- def showMillisDistribution(
- heading: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Long],
- taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
- showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
- }
-
- val seconds = 1000L
- val minutes = seconds * 60
- val hours = minutes * 60
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
- /**
- * Reformat a time interval in milliseconds to a prettier format for output
- */
- def millisToString(ms: Long): String = {
- val (size, units) =
- if (ms > hours) {
- (ms.toDouble / hours, "hours")
- } else if (ms > minutes) {
- (ms.toDouble / minutes, "min")
- } else if (ms > seconds) {
- (ms.toDouble / seconds, "s")
- } else {
- (ms.toDouble, "ms")
- }
- "%.1f %s".format(size, units)
- }
-}
+ override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult): Unit = { }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { }
+
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = { }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { }
+
+ override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate): Unit = { }
+
+ override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = { }
+
+ override def onBlockManagerRemoved(
+ blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = { }
+
+ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = { }
+
+ override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { }
+
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { }
+
+ override def onExecutorMetricsUpdate(
+ executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
+
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
+
+ override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }
+
+ override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { }
-private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-
-private object RuntimePercentage {
- def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
- val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
- val fetch = fetchTime.map(_ / denom)
- val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
- val other = 1.0 - (exec + fetch.getOrElse(0d))
- RuntimePercentage(exec, fetch, other)
- }
+ override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
new file mode 100644
index 0000000000..309f4b806b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/StatsReportListener.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Distribution, Utils}
+
+
+/**
+ * :: DeveloperApi ::
+ * Simple SparkListener that logs a few summary statistics when each stage completes.
+ */
+@DeveloperApi
+class StatsReportListener extends SparkListener with Logging {
+
+ import org.apache.spark.scheduler.StatsReportListener._
+
+ private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val info = taskEnd.taskInfo
+ val metrics = taskEnd.taskMetrics
+ if (info != null && metrics != null) {
+ taskInfoMetrics += ((info, metrics))
+ }
+ }
+
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
+ implicit val sc = stageCompleted
+ this.logInfo(s"Finished stage: ${getStatusDetail(stageCompleted.stageInfo)}")
+ showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
+
+ // Shuffle write
+ showBytesDistribution("shuffle bytes written:",
+ (_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)
+
+ // Fetch & I/O
+ showMillisDistribution("fetch wait time:",
+ (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
+ showBytesDistribution("remote bytes read:",
+ (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
+ showBytesDistribution("task result size:",
+ (_, metric) => Some(metric.resultSize), taskInfoMetrics)
+
+ // Runtime breakdown
+ val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
+ RuntimePercentage(info.duration, metrics)
+ }
+ showDistribution("executor (non-fetch) time pct: ",
+ Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
+ showDistribution("fetch wait time pct: ",
+ Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
+ showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
+ taskInfoMetrics.clear()
+ }
+
+ private def getStatusDetail(info: StageInfo): String = {
+ val failureReason = info.failureReason.map("(" + _ + ")").getOrElse("")
+ val timeTaken = info.submissionTime.map(
+ x => info.completionTime.getOrElse(System.currentTimeMillis()) - x
+ ).getOrElse("-")
+
+ s"Stage(${info.stageId}, ${info.attemptId}); Name: '${info.name}'; " +
+ s"Status: ${info.getStatusString}$failureReason; numTasks: ${info.numTasks}; " +
+ s"Took: $timeTaken msec"
+ }
+
+}
+
+private[spark] object StatsReportListener extends Logging {
+
+ // For profiling, the extremes are more interesting
+ val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100)
+ val probabilities = percentiles.map(_ / 100.0)
+ val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
+
+ def extractDoubleDistribution(
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+ getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
+ Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
+ }
+
+ // Is there some way to setup the types that I can get rid of this completely?
+ def extractLongDistribution(
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
+ extractDoubleDistribution(
+ taskInfoMetrics,
+ (info, metric) => { getMetric(info, metric).map(_.toDouble) })
+ }
+
+ def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
+ val stats = d.statCounter
+ val quantiles = d.getQuantiles(probabilities).map(formatNumber)
+ logInfo(heading + stats)
+ logInfo(percentilesHeader)
+ logInfo("\t" + quantiles.mkString("\t"))
+ }
+
+ def showDistribution(
+ heading: String,
+ dOpt: Option[Distribution],
+ formatNumber: Double => String) {
+ dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
+ }
+
+ def showDistribution(heading: String, dOpt: Option[Distribution], format: String) {
+ def f(d: Double): String = format.format(d)
+ showDistribution(heading, dOpt, f _)
+ }
+
+ def showDistribution(
+ heading: String,
+ format: String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Double],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
+ }
+
+ def showBytesDistribution(
+ heading: String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
+ }
+
+ def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
+ dOpt.foreach { dist => showBytesDistribution(heading, dist) }
+ }
+
+ def showBytesDistribution(heading: String, dist: Distribution) {
+ showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
+ }
+
+ def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
+ showDistribution(heading, dOpt,
+ (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+ }
+
+ def showMillisDistribution(
+ heading: String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
+ }
+
+ val seconds = 1000L
+ val minutes = seconds * 60
+ val hours = minutes * 60
+
+ /**
+ * Reformat a time interval in milliseconds to a prettier format for output
+ */
+ def millisToString(ms: Long): String = {
+ val (size, units) =
+ if (ms > hours) {
+ (ms.toDouble / hours, "hours")
+ } else if (ms > minutes) {
+ (ms.toDouble / minutes, "min")
+ } else if (ms > seconds) {
+ (ms.toDouble / seconds, "s")
+ } else {
+ (ms.toDouble, "ms")
+ }
+ "%.1f %s".format(size, units)
+ }
+}
+
+private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
+
+private object RuntimePercentage {
+ def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
+ val denom = totalTime.toDouble
+ val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
+ val fetch = fetchTime.map(_ / denom)
+ val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
+ val other = 1.0 - (exec + fetch.getOrElse(0d))
+ RuntimePercentage(exec, fetch, other)
+ }
+}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2be490b942..9f245afd50 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -66,7 +66,16 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache")
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
- ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory"),
+ // SPARK-14358 SparkListener from trait to abstract class
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.JavaSparkListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.SparkFirehoseListener"),
+ ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.scheduler.SparkListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.jobs.JobProgressListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.exec.ExecutorsListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.env.EnvironmentListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.storage.StorageListener"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.storage.StorageStatusListener")
) ++
Seq(
// SPARK-3369 Fix Iterable/Iterator in Java API
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 6985c37f71..c086df47d9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -28,7 +28,7 @@ import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.scheduler._
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
- extends StreamingListener with SparkListener {
+ extends SparkListener with StreamingListener {
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]