aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-27 09:48:41 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24 (patch)
treefc4fe1df3f4d1296e74214ca18e66f14338363ec /core
parent503acd3a379a3686d343fdf072fc231b8fba78f9 (diff)
downloadspark-9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24.tar.gz
spark-9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24.tar.bz2
spark-9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24.zip
Add Master and Worker instrumentation support
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala44
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala55
4 files changed, 115 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index eddcafd84d..3a7c4e5a52 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -33,7 +33,8 @@ import spark.util.AkkaUtils
import ui.MasterWebUI
-private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
+private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor
+with Logging with MasterInstrumentation {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
@@ -73,6 +74,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+
+ initialize(this)
}
override def postStop() {
@@ -316,6 +319,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}
+
+ override def postStop() {
+ uninitialize()
+ }
}
private[spark] object Master {
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
new file mode 100644
index 0000000000..13088189a4
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -0,0 +1,44 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry}
+
+import spark.metrics.AbstractInstrumentation
+
+private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
+ var masterInst: Option[Master] = None
+ val metricRegistry = new MetricRegistry()
+
+ override def registryHandler = metricRegistry
+
+ override def instance = "master"
+
+ def initialize(master: Master) {
+ masterInst = Some(master)
+
+ // Register and start all the sinks
+ registerSinks
+ }
+
+ def uninitialize() {
+ unregisterSinks
+ }
+
+ // Gauge for worker numbers in cluster
+ metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0)
+ })
+
+ // Gauge for application numbers in cluster
+ metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0)
+ })
+
+ // Gauge for waiting application numbers in cluster
+ metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0)
+ })
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0bd88ea253..b64bdb8d28 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -41,7 +41,7 @@ private[spark] class Worker(
memory: Int,
masterUrl: String,
workDirPath: String = null)
- extends Actor with Logging {
+ extends Actor with Logging with WorkerInstrumentation {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
@@ -97,6 +97,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
+ startWebUi()
+
+ initialize(this)
}
def connectToMaster() {
@@ -155,10 +158,10 @@ private[spark] class Worker(
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
-
+
case RequestWorkerState => {
sender ! WorkerState(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, masterUrl, cores, memory,
+ finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
}
@@ -178,6 +181,8 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()
+
+ uninitialize()
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
new file mode 100644
index 0000000000..04c43ce33b
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -0,0 +1,55 @@
+package spark.deploy.worker
+
+import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry}
+
+import spark.metrics.AbstractInstrumentation
+
+private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
+ var workerInst: Option[Worker] = None
+ val metricRegistry = new MetricRegistry()
+
+ override def registryHandler = metricRegistry
+
+ override def instance = "worker"
+
+ def initialize(worker: Worker) {
+ workerInst = Some(worker)
+
+ // Register and start all the sinks
+ registerSinks()
+ }
+
+ def uninitialize() {
+ unregisterSinks()
+ }
+
+ // Gauge for executors number
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
+ })
+
+ // Gauge for cores used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"),
+ new Gauge[Int] {
+ override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
+ })
+
+ // Gauge for cores free of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
+ })
+
+ // Gauge for memory used of this worker
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
+ new Gauge[Int] {
+ override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
+ })
+} \ No newline at end of file