aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-06-26 00:12:05 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-06-26 00:12:05 -0700
commit9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4 (patch)
tree9b68575cdbbe7e4a7ecc26b327eaaf0e5d680d87 /core
parent1a79f0eb8da7e850c443383b3bb24e0bf8e1e7cb (diff)
downloadspark-9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4.tar.gz
spark-9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4.tar.bz2
spark-9fed6abfdcb7afcf92be56e5ccbed6599fe66bc4.zip
[SPARK-8344] Add message processing time metric to DAGScheduler
This commit adds a new metric, `messageProcessingTime`, to the DAGScheduler metrics source. This metrics tracks the time taken to process messages in the scheduler's event processing loop, which is a helpful debugging aid for diagnosing performance issues in the scheduler (such as SPARK-4961). In order to do this, I moved the creation of the DAGSchedulerSource metrics source into DAGScheduler itself, similar to how MasterSource is created and registered in Master. Author: Josh Rosen <joshrosen@databricks.com> Closes #7002 from JoshRosen/SPARK-8344 and squashes the following commits: 57f914b [Josh Rosen] Fix import ordering 7d6bb83 [Josh Rosen] Add message processing time metrics to DAGScheduler
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala8
3 files changed, 22 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 141276ac90..c7a7436462 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -545,7 +545,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Post init
_taskScheduler.postStartHook()
- _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aea6674ed2..b00a5fee09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -81,6 +81,8 @@ class DAGScheduler(
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
+ private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
@@ -1438,17 +1440,29 @@ class DAGScheduler(
taskScheduler.stop()
}
- // Start the event thread at the end of the constructor
+ // Start the event thread and register the metrics source at the end of the constructor
+ env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
+ private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
+
/**
* The main event loop of the DAG scheduler.
*/
- override def onReceive(event: DAGSchedulerEvent): Unit = event match {
+ override def onReceive(event: DAGSchedulerEvent): Unit = {
+ val timerContext = timer.time()
+ try {
+ doOnReceive(event)
+ } finally {
+ timerContext.stop()
+ }
+ }
+
+ private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 02c67073af..6b667d5d76 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -17,11 +17,11 @@
package org.apache.spark.scheduler
-import com.codahale.metrics.{Gauge, MetricRegistry}
+import com.codahale.metrics.{Gauge, MetricRegistry, Timer}
import org.apache.spark.metrics.source.Source
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
+private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
@@ -45,4 +45,8 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
+
+ /** Timer that tracks the time to process messages in the DAGScheduler's event loop */
+ val messageProcessingTimer: Timer =
+ metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}