aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-08-01 17:41:58 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-08-01 17:41:58 -0700
commit9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6 (patch)
tree368414824e9d24eafba488e74a880394653e92b0 /core
parent6d7afd7ced7207467395f586a683d7f803f72a55 (diff)
parentd3c37ff120f42f4291a5970fb842af79e0f4a866 (diff)
downloadspark-9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6.tar.gz
spark-9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6.tar.bz2
spark-9d7dfd2d5a28c3e329fe14cf10f5184af1263dd6.zip
Merge pull request #743 from pwendell/app-metrics
Add application metrics to standalone master
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/ApplicationInfo.scala1
-rw-r--r--core/src/main/scala/spark/deploy/master/ApplicationSource.scala24
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala22
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala9
4 files changed, 50 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
index 15ff919738..79687df614 100644
--- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
@@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
+ val appSource = new ApplicationSource(this)
private var nextExecutorId = 0
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
new file mode 100644
index 0000000000..4df2b6bfdd
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
@@ -0,0 +1,24 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ApplicationSource(val application: ApplicationInfo) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "%s.%s.%s".format("application", application.desc.name,
+ System.currentTimeMillis())
+
+ metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
+ override def getValue: String = application.state.toString
+ })
+
+ metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+ override def getValue: Long = application.duration
+ })
+
+ metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+ override def getValue: Int = application.coresGranted
+ })
+
+}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 202d5bcdb7..0aed4b9802 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -38,6 +38,7 @@ import spark.util.AkkaUtils
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -59,7 +60,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val metricsSystem = MetricsSystem.createMetricsSystem("master")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
@@ -79,13 +81,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
- metricsSystem.registerSource(masterSource)
- metricsSystem.start()
+ masterMetricsSystem.registerSource(masterSource)
+ masterMetricsSystem.start()
+ applicationMetricsSystem.start()
}
override def postStop() {
webUi.stop()
- metricsSystem.stop()
+ masterMetricsSystem.stop()
+ applicationMetricsSystem.stop()
}
override def receive = {
@@ -275,6 +279,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
@@ -300,7 +305,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
- completedApps += app // Remember it in our history
+ if (completedApps.size >= RETAINED_APPLICATIONS) {
+ val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+ completedApps.take(toRemove).foreach( a => {
+ applicationMetricsSystem.removeSource(a.appSource)
+ })
+ completedApps.trimStart(toRemove)
+ }
+ completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index fabddfb947..1dacafa135 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -17,7 +17,7 @@
package spark.metrics
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
@@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
+ def removeSource(source: Source) {
+ sources -= source
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+ })
+ }
+
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)