aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-19 14:53:58 -0800
committerAndrew Or <andrew@databricks.com>2015-11-19 14:53:58 -0800
commitf7135ed7194d4f936f6f58e14f02b1ed93f68ad1 (patch)
tree612597c17af101d3395e60bb78f7c33d94238a77
parent3bd77b213a9cd177c3ea3c61d37e5098e55f75a5 (diff)
downloadspark-f7135ed7194d4f936f6f58e14f02b1ed93f68ad1.tar.gz
spark-f7135ed7194d4f936f6f58e14f02b1ed93f68ad1.tar.bz2
spark-f7135ed7194d4f936f6f58e14f02b1ed93f68ad1.zip
[SPARK-11828][CORE] Register DAGScheduler metrics source after app id is known.
Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9820 from vanzin/SPARK-11828.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala4
2 files changed, 2 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ab374cb712..af4456c05b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -581,6 +581,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Post init
_taskScheduler.postStartHook()
+ _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4a9518fff4..ae725b467d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -130,7 +130,7 @@ class DAGScheduler(
def this(sc: SparkContext) = this(sc, sc.taskScheduler)
- private[scheduler] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
+ private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
private[scheduler] val nextJobId = new AtomicInteger(0)
private[scheduler] def numTotalJobs: Int = nextJobId.get()
@@ -1580,8 +1580,6 @@ class DAGScheduler(
taskScheduler.stop()
}
- // Start the event thread and register the metrics source at the end of the constructor
- env.metricsSystem.registerSource(metricsSource)
eventProcessLoop.start()
}