aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-27 18:29:54 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit03f9871116801abbdd7b4c7892c8d6affb1c4d9e (patch)
tree63987eae85d3ec812dfae99b76d9e9f021d4f80b /core
parentc3daad3f65630eb4ed536d06c0d467cde57a8142 (diff)
downloadspark-03f9871116801abbdd7b4c7892c8d6affb1c4d9e.tar.gz
spark-03f9871116801abbdd7b4c7892c8d6affb1c4d9e.tar.bz2
spark-03f9871116801abbdd7b4c7892c8d6affb1c4d9e.zip
MetricsSystem refactor
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala17
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala64
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala131
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala (renamed from core/src/main/scala/spark/metrics/AbstractInstrumentation.scala)54
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala10
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala10
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala4
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala4
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala30
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala6
11 files changed, 193 insertions, 149 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 3a7c4e5a52..e44f5e3168 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -29,12 +29,12 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import spark.deploy._
import spark.{Logging, SparkException, Utils}
+import spark.metrics.MetricsSystem
import spark.util.AkkaUtils
import ui.MasterWebUI
-private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor
-with Logging with MasterInstrumentation {
+private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
@@ -57,6 +57,8 @@ with Logging with MasterInstrumentation {
val webUi = new MasterWebUI(self, webUiPort)
Utils.checkHost(host, "Expected hostname")
+
+ val masterInstrumentation = new MasterInstrumentation(this)
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
@@ -75,7 +77,7 @@ with Logging with MasterInstrumentation {
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
- initialize(this)
+ Master.metricsSystem.registerSource(masterInstrumentation)
}
override def postStop() {
@@ -319,21 +321,22 @@ with Logging with MasterInstrumentation {
removeWorker(worker)
}
}
-
- override def postStop() {
- uninitialize()
- }
}
private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+
+ private val metricsSystem = MetricsSystem.createMetricsSystem("master")
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
+
+ metricsSystem.start()
actorSystem.awaitTermination()
+ metricsSystem.stop()
}
/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index c295e725d7..5ea9a90319 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -1,47 +1,35 @@
package spark.deploy.master
-import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry}
+import java.util.{Map, HashMap => JHashMap}
-import spark.metrics.AbstractInstrumentation
+import com.codahale.metrics.{Gauge, Metric}
-private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
- var masterInst: Option[Master] = None
- val metricRegistry = new MetricRegistry()
-
- override def registryHandler = metricRegistry
-
- override def instance = "master"
+import spark.metrics.source.Source
+
+private[spark] class MasterInstrumentation(val master: Master) extends Source {
+ val className = classOf[Master].getName()
+ val instrumentationName = "master"
+
+ override def sourceName = instrumentationName
- def initialize(master: Master) {
- masterInst = Some(master)
+ override def getMetrics(): Map[String, Metric] = {
+ val gauges = new JHashMap[String, Metric]
- // Register all the sources
- registerSources()
+ // Gauge for worker numbers in cluster
+ gauges.put(className + ".workers.number", new Gauge[Int] {
+ override def getValue: Int = master.workers.size
+ })
- // Register and start all the sinks
- registerSinks()
- }
-
- def uninitialize() {
- unregisterSinks()
+ // Gauge for application numbers in cluster
+ gauges.put(className + ".apps.number", new Gauge[Int] {
+ override def getValue: Int = master.apps.size
+ })
+
+ // Gauge for waiting application numbers in cluster
+ gauges.put(className + ".waiting_apps.number", new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
+ })
+
+ gauges
}
-
- // Gauge for worker numbers in cluster
- metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"),
- new Gauge[Int] {
- override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0)
- })
-
- // Gauge for application numbers in cluster
- metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"),
- new Gauge[Int] {
- override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0)
- })
-
- // Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"),
- new Gauge[Int] {
- override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0)
- })
-
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index b64bdb8d28..eaa1c1806f 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -23,6 +23,7 @@ import akka.util.duration._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
+import spark.metrics.MetricsSystem
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat
import java.util.Date
@@ -41,7 +42,7 @@ private[spark] class Worker(
memory: Int,
masterUrl: String,
workDirPath: String = null)
- extends Actor with Logging with WorkerInstrumentation {
+ extends Actor with Logging {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
@@ -67,6 +68,8 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
+ val workerInstrumentation = new WorkerInstrumentation(this)
+
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
@@ -99,7 +102,8 @@ private[spark] class Worker(
connectToMaster()
startWebUi()
- initialize(this)
+ Worker.metricsSystem.registerSource(workerInstrumentation)
+ Worker.metricsSystem.start()
}
def connectToMaster() {
@@ -182,11 +186,13 @@ private[spark] class Worker(
executors.values.foreach(_.kill())
webUi.stop()
- uninitialize()
+ Worker.metricsSystem.stop()
}
}
private[spark] object Worker {
+ private val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 2f725300b5..37fd154859 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -1,58 +1,89 @@
package spark.deploy.worker
-import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry}
+import com.codahale.metrics.{Gauge, Metric}
-import spark.metrics.AbstractInstrumentation
+import java.util.{Map, HashMap => JHashMap}
-private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
- var workerInst: Option[Worker] = None
- val metricRegistry = new MetricRegistry()
-
- override def registryHandler = metricRegistry
-
- override def instance = "worker"
+import spark.metrics.source.Source
+
+private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
+ val className = classOf[Worker].getName()
- def initialize(worker: Worker) {
- workerInst = Some(worker)
+ override def sourceName = "worker"
- // Register all the sources
- registerSources()
+ override def getMetrics: Map[String, Metric] = {
+ val gauges = new JHashMap[String, Metric]
- // Register and start all the sinks
- registerSinks()
- }
-
- def uninitialize() {
- unregisterSinks()
+ // Gauge for executors number
+ gauges.put(className + ".executor.number", new Gauge[Int]{
+ override def getValue: Int = worker.executors.size
+ })
+
+ gauges.put(className + ".core_used.number", new Gauge[Int]{
+ override def getValue: Int = worker.coresUsed
+ })
+
+ gauges.put(className + ".mem_used.MBytes", new Gauge[Int]{
+ override def getValue: Int = worker.memoryUsed
+ })
+
+ gauges.put(className + ".core_free.number", new Gauge[Int]{
+ override def getValue: Int = worker.coresFree
+ })
+
+ gauges.put(className + ".mem_free.MBytes", new Gauge[Int]{
+ override def getValue: Int = worker.memoryFree
+ })
+
+ gauges
}
-
- // Gauge for executors number
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
- new Gauge[Int] {
- override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
- })
-
- // Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
- new Gauge[Int] {
- override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
- })
-
- // Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
- new Gauge[Int] {
- override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
- })
-
- // Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
- new Gauge[Int] {
- override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
- })
-
- // Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
- new Gauge[Int] {
- override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
- })
-} \ No newline at end of file
+}
+//private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
+// var workerInst: Option[Worker] = None
+// val metricRegistry = new MetricRegistry()
+//
+// override def registryHandler = metricRegistry
+//
+// override def instance = "worker"
+//
+// def initialize(worker: Worker) {
+// workerInst = Some(worker)
+//
+// registerSources()
+// registerSinks()
+// }
+//
+// def uninitialize() {
+// unregisterSinks()
+// }
+//
+// // Gauge for executors number
+// metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
+// new Gauge[Int] {
+// override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
+// })
+//
+// // Gauge for cores used of this worker
+// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
+// new Gauge[Int] {
+// override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
+// })
+//
+// // Gauge for memory used of this worker
+// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
+// new Gauge[Int] {
+// override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
+// })
+//
+// // Gauge for cores free of this worker
+// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
+// new Gauge[Int] {
+// override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
+// })
+//
+// // Gauge for memory used of this worker
+// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
+// new Gauge[Int] {
+// override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
+// })
+//} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 9cae1e0220..ea1bc490b5 100644
--- a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -2,7 +2,7 @@ package spark.metrics
import scala.collection.mutable
-import com.codahale.metrics.{JmxReporter, MetricRegistry}
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
@@ -11,70 +11,76 @@ import spark.Logging
import spark.metrics.sink._
import spark.metrics.source._
-private [spark] trait AbstractInstrumentation extends Logging {
+private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
- // Get MetricRegistry handler
- def registryHandler: MetricRegistry
- // Get the instance name
- def instance: String
-
val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
+ def start() {
+ registerSources()
+ registerSinks()
+ }
+
+ def stop() {
+ sinks.foreach(_.stop)
+ }
+
+ def registerSource(source: Source) {
+ sources += source
+ MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet])
+ }
+
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
- val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX)
+ val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
- // Register all the sources
+ // Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
- val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry])
- .newInstance(registryHandler)
+ val source = Class.forName(classPath).newInstance()
sources += source.asInstanceOf[Source]
+ MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet])
} catch {
case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
}
}
- sources.foreach(_.registerSource)
}
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
- val sinkConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SINK_REGEX)
+ val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
// Register JMX sink as a default sink
- sinks += new JmxSink(registryHandler)
+ sinks += new JmxSink(MetricsSystem.registry)
// Register other sinks according to conf
sinkConfigs.foreach { kv =>
- val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
- AbstractInstrumentation.DEFAULT_SINKS(kv._1)
+ val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) {
+ MetricsSystem.DEFAULT_SINKS(kv._1)
} else {
// For non-default sink, a property class should be set and create using reflection
kv._2.getProperty("class")
}
try {
val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
- .newInstance(kv._2, registryHandler)
+ .newInstance(kv._2, MetricsSystem.registry)
sinks += sink.asInstanceOf[Sink]
} catch {
case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
}
}
- sinks.foreach(_.registerSink)
- }
-
- def unregisterSinks() {
- sinks.foreach(_.unregisterSink)
+ sinks.foreach(_.start)
}
}
-object AbstractInstrumentation {
+private[spark] object MetricsSystem {
+ val registry = new MetricRegistry()
+
val DEFAULT_SINKS = Map(
"console" -> "spark.metrics.sink.ConsoleSink",
"csv" -> "spark.metrics.sink.CsvSink")
@@ -88,4 +94,6 @@ object AbstractInstrumentation {
"minute" -> TimeUnit.MINUTES,
"hour" -> TimeUnit.HOURS,
"day" -> TimeUnit.DAYS)
+
+ def createMetricsSystem(instance: String) = new MetricsSystem(instance)
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index 5426af8c4c..b49a211fb3 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
-import spark.metrics.AbstractInstrumentation
+import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match {
@@ -14,13 +14,13 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
}
val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match {
- case Some(s) => AbstractInstrumentation.timeUnits(s)
- case None => AbstractInstrumentation.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
+ case Some(s) => MetricsSystem.timeUnits(s)
+ case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
}
var reporter: ConsoleReporter = _
- override def registerSink() {
+ override def start() {
reporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
@@ -29,7 +29,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
reporter.start(pollPeriod, pollUnit)
}
- override def unregisterSink() {
+ override def stop() {
reporter.stop()
}
}
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 1d663f6cff..3f572c8e05 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.{CsvReporter, MetricRegistry}
-import spark.metrics.AbstractInstrumentation
+import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match {
@@ -15,8 +15,8 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
}
val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match {
- case Some(s) => AbstractInstrumentation.timeUnits(s)
- case None => AbstractInstrumentation.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
+ case Some(s) => MetricsSystem.timeUnits(s)
+ case None => MetricsSystem.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
}
val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match {
@@ -26,7 +26,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
var reporter: CsvReporter = _
- override def registerSink() {
+ override def start() {
reporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
@@ -36,7 +36,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
reporter.start(pollPeriod, pollUnit)
}
- override def unregisterSink() {
+ override def stop() {
reporter.stop()
}
}
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index 56e5677700..e223dc26e9 100644
--- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -5,12 +5,12 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry}
class JmxSink(registry: MetricRegistry) extends Sink {
var reporter: JmxReporter = _
- override def registerSink() {
+ override def start() {
reporter = JmxReporter.forRegistry(registry).build()
reporter.start()
}
- override def unregisterSink() {
+ override def stop() {
reporter.stop()
}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 26052b7231..9fef894fde 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -1,7 +1,7 @@
package spark.metrics.sink
trait Sink {
- def registerSink: Unit
+ def start: Unit
- def unregisterSink: Unit
+ def stop: Unit
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
index 8f6bf48843..7a7c1b6ffb 100644
--- a/core/src/main/scala/spark/metrics/source/JvmSource.scala
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -1,17 +1,23 @@
package spark.metrics.source
-import com.codahale.metrics.MetricRegistry
-import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+import java.util.{Map, HashMap => JHashMap}
-class JvmSource(registry: MetricRegistry) extends Source {
- // Initialize memory usage gauge for jvm
- val memUsageMetricSet = new MemoryUsageGaugeSet
-
- // Initialize garbage collection usage gauge for jvm
- val gcMetricSet = new GarbageCollectorMetricSet
-
- override def registerSource() {
- registry.registerAll(memUsageMetricSet)
- registry.registerAll(gcMetricSet)
+import com.codahale.metrics.Metric
+import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+
+class JvmSource extends Source {
+ override def sourceName = "jvm"
+
+ override def getMetrics(): Map[String, Metric] = {
+ val gauges = new JHashMap[String, Metric]
+
+ import scala.collection.JavaConversions._
+ val gcMetricSet = new GarbageCollectorMetricSet
+ gcMetricSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
+
+ val memGaugeSet = new MemoryUsageGaugeSet
+ memGaugeSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
+
+ gauges
}
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
index 35cfe0c8ff..edd59de46a 100644
--- a/core/src/main/scala/spark/metrics/source/Source.scala
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -1,5 +1,7 @@
package spark.metrics.source
-trait Source {
- def registerSource: Unit
+import com.codahale.metrics.MetricSet
+
+trait Source extends MetricSet {
+ def sourceName: String
} \ No newline at end of file