From 9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 27 Jun 2013 09:48:41 +0800 Subject: Add Master and Worker instrumentation support --- .../main/scala/spark/deploy/master/Master.scala | 9 +++- .../deploy/master/MasterInstrumentation.scala | 44 +++++++++++++++++ .../main/scala/spark/deploy/worker/Worker.scala | 11 +++-- .../deploy/worker/WorkerInstrumentation.scala | 55 ++++++++++++++++++++++ 4 files changed, 115 insertions(+), 4 deletions(-) create mode 100644 core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala create mode 100644 core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala (limited to 'core') diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index eddcafd84d..3a7c4e5a52 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -33,7 +33,8 @@ import spark.util.AkkaUtils import ui.MasterWebUI -private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { +private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor +with Logging with MasterInstrumentation { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 @@ -73,6 +74,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) + + initialize(this) } override def postStop() { @@ -316,6 +319,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act removeWorker(worker) } } + + override def postStop() { + uninitialize() + } } private[spark] object Master { diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala new file mode 100644 index 0000000000..13088189a4 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala @@ -0,0 +1,44 @@ +package spark.deploy.master + +import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry} + +import spark.metrics.AbstractInstrumentation + +private[spark] trait MasterInstrumentation extends AbstractInstrumentation { + var masterInst: Option[Master] = None + val metricRegistry = new MetricRegistry() + + override def registryHandler = metricRegistry + + override def instance = "master" + + def initialize(master: Master) { + masterInst = Some(master) + + // Register and start all the sinks + registerSinks + } + + def uninitialize() { + unregisterSinks + } + + // Gauge for worker numbers in cluster + metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"), + new Gauge[Int] { + override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0) + }) + + // Gauge for application numbers in cluster + metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"), + new Gauge[Int] { + override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0) + }) + + // Gauge for waiting application numbers in cluster + metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"), + new Gauge[Int] { + override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0) + }) + +} \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 0bd88ea253..b64bdb8d28 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -41,7 +41,7 @@ private[spark] class Worker( memory: Int, masterUrl: String, workDirPath: String = null) - extends Actor with Logging { + extends Actor with Logging with WorkerInstrumentation { Utils.checkHost(host, "Expected hostname") assert (port > 0) @@ -97,6 +97,9 @@ private[spark] class Worker( webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() connectToMaster() + startWebUi() + + initialize(this) } def connectToMaster() { @@ -155,10 +158,10 @@ private[spark] class Worker( case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected() - + case RequestWorkerState => { sender ! WorkerState(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, masterUrl, cores, memory, + finishedExecutors.values.toList, masterUrl, cores, memory, coresUsed, memoryUsed, masterWebUiUrl) } } @@ -178,6 +181,8 @@ private[spark] class Worker( override def postStop() { executors.values.foreach(_.kill()) webUi.stop() + + uninitialize() } } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala new file mode 100644 index 0000000000..04c43ce33b --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala @@ -0,0 +1,55 @@ +package spark.deploy.worker + +import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry} + +import spark.metrics.AbstractInstrumentation + +private[spark] trait WorkerInstrumentation extends AbstractInstrumentation { + var workerInst: Option[Worker] = None + val metricRegistry = new MetricRegistry() + + override def registryHandler = metricRegistry + + override def instance = "worker" + + def initialize(worker: Worker) { + workerInst = Some(worker) + + // Register and start all the sinks + registerSinks() + } + + def uninitialize() { + unregisterSinks() + } + + // Gauge for executors number + metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), + new Gauge[Int] { + override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0) + }) + + // Gauge for cores used of this worker + metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), + new Gauge[Int] { + override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0) + }) + + // Gauge for memory used of this worker + metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"), + new Gauge[Int] { + override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0) + }) + + // Gauge for cores free of this worker + metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), + new Gauge[Int] { + override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0) + }) + + // Gauge for memory used of this worker + metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), + new Gauge[Int] { + override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0) + }) +} \ No newline at end of file -- cgit v1.2.3