aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-09-08 16:06:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-09-08 16:06:49 -0700
commitb4e382c210b4987da78421f5de11199e4d74f0e7 (patch)
tree9740951e4e4beb700d48c05f0fdd01b6b06b74df /core
parent8026537597806d4249c4b1362f8954b6a3e51205 (diff)
downloadspark-b4e382c210b4987da78421f5de11199e4d74f0e7.tar.gz
spark-b4e382c210b4987da78421f5de11199e4d74f0e7.tar.bz2
spark-b4e382c210b4987da78421f5de11199e4d74f0e7.zip
Adding sc name in metrics source
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala6
5 files changed, 14 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 89318712a5..4f711a5ea6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -282,8 +282,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
- val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+ val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
+ val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d365804994..ceae3b8289 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
- val executorSource = new ExecutorSource(this)
+ val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index bf8fb4fd21..18c9dc1c0a 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
-class ExecutorSource(val executor: Executor) extends Source {
+class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
- val sourceName = "executor"
+ // TODO: It would be nice to pass the application name here
+ val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 22e3723ac8..446d490cc9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "DAGScheduler"
+ val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 3d709cfde4..acc3951088 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.SparkContext
-private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+ extends Source {
val metricRegistry = new MetricRegistry()
- val sourceName = "BlockManager"
+ val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {