aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-06-28 02:26:55 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit4d6dd67fa1f8f031f1ef46a442cec733fa3b1a7a (patch)
treeb0deb342a86e59634b5eeecc43ad43dc7046d673
parent03f9871116801abbdd7b4c7892c8d6affb1c4d9e (diff)
downloadspark-4d6dd67fa1f8f031f1ef46a442cec733fa3b1a7a.tar.gz
spark-4d6dd67fa1f8f031f1ef46a442cec733fa3b1a7a.tar.bz2
spark-4d6dd67fa1f8f031f1ef46a442cec733fa3b1a7a.zip
refactor metrics system
1.change source abstract class to support MetricRegistry 2.change master/work/jvm source class
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala44
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala98
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala23
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala11
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala9
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala5
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala22
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala6
8 files changed, 84 insertions, 134 deletions
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 5ea9a90319..46c90b94d2 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -4,32 +4,32 @@ import java.util.{Map, HashMap => JHashMap}
import com.codahale.metrics.{Gauge, Metric}
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+
import spark.metrics.source.Source
+import spark.Logging
private[spark] class MasterInstrumentation(val master: Master) extends Source {
val className = classOf[Master].getName()
val instrumentationName = "master"
-
- override def sourceName = instrumentationName
-
- override def getMetrics(): Map[String, Metric] = {
- val gauges = new JHashMap[String, Metric]
-
- // Gauge for worker numbers in cluster
- gauges.put(className + ".workers.number", new Gauge[Int] {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = instrumentationName
+
+ metricRegistry.register(MetricRegistry.name("workers","number"),
+ new Gauge[Int] {
override def getValue: Int = master.workers.size
- })
-
- // Gauge for application numbers in cluster
- gauges.put(className + ".apps.number", new Gauge[Int] {
- override def getValue: Int = master.apps.size
- })
-
- // Gauge for waiting application numbers in cluster
- gauges.put(className + ".waiting_apps.number", new Gauge[Int] {
+ })
+
+ // Gauge for application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("apps", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
+
+ // Gauge for waiting application numbers in cluster
+ metricRegistry.register(MetricRegistry.name("waiting_apps", "number"),
+ new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
- })
-
- gauges
- }
-} \ No newline at end of file
+ })
+
+}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 37fd154859..5ce29cf04c 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -4,86 +4,42 @@ import com.codahale.metrics.{Gauge, Metric}
import java.util.{Map, HashMap => JHashMap}
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source
private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val className = classOf[Worker].getName()
- override def sourceName = "worker"
-
- override def getMetrics: Map[String, Metric] = {
- val gauges = new JHashMap[String, Metric]
-
- // Gauge for executors number
- gauges.put(className + ".executor.number", new Gauge[Int]{
+ val sourceName = "worker"
+
+ val metricRegistry = new MetricRegistry()
+
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
+ new Gauge[Int] {
override def getValue: Int = worker.executors.size
- })
-
- gauges.put(className + ".core_used.number", new Gauge[Int]{
+ })
+
+ // Gauge for cores used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
+ new Gauge[Int] {
override def getValue: Int = worker.coresUsed
- })
-
- gauges.put(className + ".mem_used.MBytes", new Gauge[Int]{
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
+ new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
- })
-
- gauges.put(className + ".core_free.number", new Gauge[Int]{
+ })
+
+ // Gauge for cores free of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
+ new Gauge[Int] {
override def getValue: Int = worker.coresFree
- })
-
- gauges.put(className + ".mem_free.MBytes", new Gauge[Int]{
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
+ new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
-
- gauges
- }
}
-//private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
-// var workerInst: Option[Worker] = None
-// val metricRegistry = new MetricRegistry()
-//
-// override def registryHandler = metricRegistry
-//
-// override def instance = "worker"
-//
-// def initialize(worker: Worker) {
-// workerInst = Some(worker)
-//
-// registerSources()
-// registerSinks()
-// }
-//
-// def uninitialize() {
-// unregisterSinks()
-// }
-//
-// // Gauge for executors number
-// metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
-// new Gauge[Int] {
-// override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
-// })
-//
-// // Gauge for cores used of this worker
-// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
-// new Gauge[Int] {
-// override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
-// })
-//
-// // Gauge for memory used of this worker
-// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
-// new Gauge[Int] {
-// override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
-// })
-//
-// // Gauge for cores free of this worker
-// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
-// new Gauge[Int] {
-// override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
-// })
-//
-// // Gauge for memory used of this worker
-// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
-// new Gauge[Int] {
-// override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
-// })
-//} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index ea1bc490b5..a23ccd2692 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -5,6 +5,7 @@ import scala.collection.mutable
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties
+//import java.util._
import java.util.concurrent.TimeUnit
import spark.Logging
@@ -19,10 +20,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
+ var registry = new MetricRegistry()
+
+ registerSources()
+ registerSinks()
def start() {
- registerSources()
- registerSinks()
+ sinks.foreach(_.start)
}
def stop() {
@@ -31,20 +35,20 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
def registerSource(source: Source) {
sources += source
- MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet])
+ registry.register(source.sourceName,source.metricRegistry)
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
-
+
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Class.forName(classPath).newInstance()
sources += source.asInstanceOf[Source]
- MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet])
+ registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
}
@@ -56,7 +60,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
// Register JMX sink as a default sink
- sinks += new JmxSink(MetricsSystem.registry)
+ sinks += new JmxSink(registry)
// Register other sinks according to conf
sinkConfigs.foreach { kv =>
@@ -68,19 +72,16 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
try {
val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
- .newInstance(kv._2, MetricsSystem.registry)
+ .newInstance(kv._2, registry)
sinks += sink.asInstanceOf[Sink]
} catch {
case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
}
}
- sinks.foreach(_.start)
}
}
private[spark] object MetricsSystem {
- val registry = new MetricRegistry()
-
val DEFAULT_SINKS = Map(
"console" -> "spark.metrics.sink.ConsoleSink",
"csv" -> "spark.metrics.sink.CsvSink")
@@ -96,4 +97,4 @@ private[spark] object MetricsSystem {
"day" -> TimeUnit.DAYS)
def createMetricsSystem(instance: String) = new MetricsSystem(instance)
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index b49a211fb3..9cd17556fa 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -18,14 +18,13 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
}
- var reporter: ConsoleReporter = _
-
- override def start() {
- reporter = ConsoleReporter.forRegistry(registry)
+ var reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
-
+
+
+ override def start() {
reporter.start(pollPeriod, pollUnit)
}
@@ -40,4 +39,4 @@ object ConsoleSink {
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 3f572c8e05..62e51be0dc 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -24,15 +24,14 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
case None => CsvSink.CSV_DEFAULT_DIR
}
- var reporter: CsvReporter = _
-
- override def start() {
- reporter = CsvReporter.forRegistry(registry)
+ var reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
-
+
+
+ override def start() {
reporter.start(pollPeriod, pollUnit)
}
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index e223dc26e9..98b55f7b7f 100644
--- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -3,10 +3,9 @@ package spark.metrics.sink
import com.codahale.metrics.{JmxReporter, MetricRegistry}
class JmxSink(registry: MetricRegistry) extends Sink {
- var reporter: JmxReporter = _
+ var reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
- reporter = JmxReporter.forRegistry(registry).build()
reporter.start()
}
@@ -14,4 +13,4 @@ class JmxSink(registry: MetricRegistry) extends Sink {
reporter.stop()
}
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
index 7a7c1b6ffb..13270dae3c 100644
--- a/core/src/main/scala/spark/metrics/source/JvmSource.scala
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -2,22 +2,16 @@ package spark.metrics.source
import java.util.{Map, HashMap => JHashMap}
-import com.codahale.metrics.Metric
+import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
class JvmSource extends Source {
- override def sourceName = "jvm"
-
- override def getMetrics(): Map[String, Metric] = {
- val gauges = new JHashMap[String, Metric]
-
- import scala.collection.JavaConversions._
+ val sourceName = "jvm"
+ val metricRegistry = new MetricRegistry()
+
val gcMetricSet = new GarbageCollectorMetricSet
- gcMetricSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
-
val memGaugeSet = new MemoryUsageGaugeSet
- memGaugeSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
-
- gauges
- }
-} \ No newline at end of file
+
+ metricRegistry.registerAll(gcMetricSet)
+ metricRegistry.registerAll(memGaugeSet)
+}
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
index edd59de46a..17cbe2f85a 100644
--- a/core/src/main/scala/spark/metrics/source/Source.scala
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -1,7 +1,9 @@
package spark.metrics.source
import com.codahale.metrics.MetricSet
+import com.codahale.metrics.MetricRegistry
-trait Source extends MetricSet {
+trait Source {
def sourceName: String
-} \ No newline at end of file
+ def metricRegistry: MetricRegistry
+}