aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala18
1 files changed, 16 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aea6674ed2..b00a5fee09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -81,6 +81,8 @@ class DAGScheduler(
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
+ private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
@@ -1438,17 +1440,29 @@ class DAGScheduler(
taskScheduler.stop()
}
- // Start the event thread at the end of the constructor
+ // Start the event thread and register the metrics source at the end of the constructor
+ env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
+ private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
+
/**
* The main event loop of the DAG scheduler.
*/
- override def onReceive(event: DAGSchedulerEvent): Unit = event match {
+ override def onReceive(event: DAGSchedulerEvent): Unit = {
+ val timerContext = timer.time()
+ try {
+ doOnReceive(event)
+ } finally {
+ timerContext.stop()
+ }
+ }
+
+ private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)