aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-05-05 09:43:49 -0700
committerAndrew Or <andrew@databricks.com>2015-05-05 09:43:49 -0700
commit9f1f9b1037ee003a07ff09d60bb360cf32c8a564 (patch)
tree6f1902fdf32dc32870b0b895a0465f6cd5425315 /core
parent57e9f29e17d97ed9d0f110fb2ce5a075b854a841 (diff)
downloadspark-9f1f9b1037ee003a07ff09d60bb360cf32c8a564.tar.gz
spark-9f1f9b1037ee003a07ff09d60bb360cf32c8a564.tar.bz2
spark-9f1f9b1037ee003a07ff09d60bb360cf32c8a564.zip
[SPARK-7007] [CORE] Add a metric source for ExecutorAllocationManager
Add a metric source to expose the internal status of ExecutorAllocationManager to better monitoring the resource usage of executors when dynamic allocation is enable. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #5589 from jerryshao/dynamic-allocation-source and squashes the following commits: 104d155 [jerryshao] rebase and address the comments c501a2c [jerryshao] Address the comments d237ba5 [jerryshao] Address the comments 2c3540f [jerryshao] Add a metric source for ExecutorAllocationManager
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
2 files changed, 32 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 228d9149df..66bda68088 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -21,7 +21,10 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
import org.apache.spark.scheduler._
+import org.apache.spark.metrics.source.Source
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
/**
@@ -144,6 +147,9 @@ private[spark] class ExecutorAllocationManager(
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
+ // Metric source for ExecutorAllocationManager to expose internal status to MetricsSystem.
+ val executorAllocationManagerSource = new ExecutorAllocationManagerSource
+
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -579,6 +585,29 @@ private[spark] class ExecutorAllocationManager(
}
}
+ /**
+ * Metric source for ExecutorAllocationManager to expose its internal executor allocation
+ * status to MetricsSystem.
+ * Note: These metrics heavily rely on the internal implementation of
+ * ExecutorAllocationManager, metrics or value of metrics will be changed when internal
+ * implementation is changed, so these metrics are not stable across Spark version.
+ */
+ private[spark] class ExecutorAllocationManagerSource extends Source {
+ val sourceName = "ExecutorAllocationManager"
+ val metricRegistry = new MetricRegistry()
+
+ private def registerGauge[T](name: String, value: => T, defaultValue: T): Unit = {
+ metricRegistry.register(MetricRegistry.name("executors", name), new Gauge[T] {
+ override def getValue: T = synchronized { Option(value).getOrElse(defaultValue) }
+ })
+ }
+
+ registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
+ registerGauge("numberExecutorsPendingToRemove", executorsPendingToRemove.size, 0)
+ registerGauge("numberAllExecutors", executorIds.size, 0)
+ registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
+ registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
+ }
}
private object ExecutorAllocationManager {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 00eb432912..2ca6882c8d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -537,6 +537,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+ _executorAllocationManager.foreach { e =>
+ _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
+ }
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM