aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-28 14:46:24 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit871bc1687eaeb59df24b4778c5992a5f7f105cc8 (patch)
treeee161cc74d5228b0c76b700e6e13b3b98a7ec7ef
parent576528f0f916cc0434972cbbc9321e4dcec45628 (diff)
downloadspark-871bc1687eaeb59df24b4778c5992a5f7f105cc8.tar.gz
spark-871bc1687eaeb59df24b4778c5992a5f7f105cc8.tar.bz2
spark-871bc1687eaeb59df24b4778c5992a5f7f105cc8.zip
Add Executor instrumentation
-rw-r--r--core/src/main/scala/spark/executor/ExecutorInstrumentation.scala35
-rw-r--r--core/src/main/scala/spark/executor/MesosExecutorBackend.scala10
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala12
3 files changed, 56 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
new file mode 100644
index 0000000000..80aadb66b0
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
@@ -0,0 +1,35 @@
+package spark.executor
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ExecutorInstrumentation(val executor: Option[Executor]) extends Source{
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "executor"
+
+ // Gauge for executor thread pool's actively executing task counts
+ metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"),
+ new Gauge[Int] {
+ override def getValue: Int = executor.map(_.threadPool.getActiveCount()).getOrElse(0)
+ })
+
+ // Gauge for executor thread pool's approximate total number of tasks that have been completed
+ metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
+ new Gauge[Long] {
+ override def getValue: Long = executor.map(_.threadPool.getCompletedTaskCount()).getOrElse(0)
+ })
+
+ // Gauge for executor thread pool's current number of threads
+ metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"),
+ new Gauge[Int] {
+ override def getValue: Int = executor.map(_.threadPool.getPoolSize()).getOrElse(0)
+ })
+
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"),
+ new Gauge[Int] {
+ override def getValue: Int = executor.map(_.threadPool.getMaximumPoolSize()).getOrElse(0)
+ })
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 4961c42fad..8b6ab0c391 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -24,6 +24,7 @@ import spark.TaskState.TaskState
import com.google.protobuf.ByteString
import spark.{Utils, Logging}
import spark.TaskState
+import spark.metrics.MetricsSystem
private[spark] class MesosExecutorBackend
extends MesosExecutor
@@ -32,6 +33,9 @@ private[spark] class MesosExecutorBackend
var executor: Executor = null
var driver: ExecutorDriver = null
+
+ val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
+ MesosExecutorBackend.metricsSystem.start()
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
val mesosTaskId = TaskID.newBuilder().setValue(taskId.toString).build()
@@ -79,13 +83,17 @@ private[spark] class MesosExecutorBackend
override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {}
- override def shutdown(d: ExecutorDriver) {}
+ override def shutdown(d: ExecutorDriver) {
+ MesosExecutorBackend.metricsSystem.stop()
+ }
}
/**
* Entry point for Mesos executor.
*/
private[spark] object MesosExecutorBackend {
+ private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
+
def main(args: Array[String]) {
MesosNativeLibrary.load()
// Create a new Executor and start it running
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f4003da732..6ef74cd2ff 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -24,6 +24,7 @@ import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
+import spark.metrics.MetricsSystem
import spark.scheduler.cluster._
import spark.scheduler.cluster.RegisteredExecutor
import spark.scheduler.cluster.LaunchTask
@@ -45,6 +46,8 @@ private[spark] class StandaloneExecutorBackend(
var executor: Executor = null
var driver: ActorRef = null
+
+ val executorInstrumentation = new ExecutorInstrumentation(Option(executor))
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
@@ -52,6 +55,9 @@ private[spark] class StandaloneExecutorBackend(
driver ! RegisterExecutor(executorId, hostPort, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(driver) // Doesn't work with remote actors, but useful for testing
+
+ StandaloneExecutorBackend.metricsSystem.registerSource(executorInstrumentation)
+ StandaloneExecutorBackend.metricsSystem.start()
}
override def receive = {
@@ -81,9 +87,15 @@ private[spark] class StandaloneExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
+
+ override def postStop() {
+ StandaloneExecutorBackend.metricsSystem.stop()
+ }
}
private[spark] object StandaloneExecutorBackend {
+ private val metricsSystem = MetricsSystem.createMetricsSystem("executor")
+
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
}