From 05637de8423da85c5934cdfa8f07254133a58474 Mon Sep 17 00:00:00 2001 From: Andrew xia Date: Wed, 3 Jul 2013 21:16:39 +0800 Subject: Change class xxxInstrumentation to class xxxSource --- .../main/scala/spark/deploy/master/Master.scala | 4 +-- .../deploy/master/MasterInstrumentation.scala | 25 ---------------- .../scala/spark/deploy/master/MasterSource.scala | 25 ++++++++++++++++ .../main/scala/spark/deploy/worker/Worker.scala | 4 +-- .../deploy/worker/WorkerInstrumentation.scala | 34 ---------------------- .../scala/spark/deploy/worker/WorkerSource.scala | 34 ++++++++++++++++++++++ core/src/main/scala/spark/executor/Executor.scala | 4 +-- .../spark/executor/ExecutorInstrumentation.scala | 30 ------------------- .../main/scala/spark/executor/ExecutorSource.scala | 30 +++++++++++++++++++ .../scala/spark/metrics/MetricsSystemSuite.scala | 2 +- 10 files changed, 96 insertions(+), 96 deletions(-) delete mode 100644 core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala create mode 100644 core/src/main/scala/spark/deploy/master/MasterSource.scala delete mode 100644 core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala create mode 100644 core/src/main/scala/spark/deploy/worker/WorkerSource.scala delete mode 100644 core/src/main/scala/spark/executor/ExecutorInstrumentation.scala create mode 100644 core/src/main/scala/spark/executor/ExecutorSource.scala (limited to 'core/src') diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 5f67366eb6..1d592206c0 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -58,7 +58,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act Utils.checkHost(host, "Expected hostname") - val masterInstrumentation = new MasterInstrumentation(this) + val masterSource = new MasterSource(this) val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") @@ -77,7 +77,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act webUi.start() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) - Master.metricsSystem.registerSource(masterInstrumentation) + Master.metricsSystem.registerSource(masterSource) Master.metricsSystem.start() } diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala deleted file mode 100644 index 4c3708cc4c..0000000000 --- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala +++ /dev/null @@ -1,25 +0,0 @@ -package spark.deploy.master - -import com.codahale.metrics.{Gauge,MetricRegistry} - -import spark.metrics.source.Source - -private[spark] class MasterInstrumentation(val master: Master) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "master" - - // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { - override def getValue: Int = master.workers.size - }) - - // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { - override def getValue: Int = master.apps.size - }) - - // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] { - override def getValue: Int = master.waitingApps.size - }) -} diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala new file mode 100644 index 0000000000..f94e5b2c34 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala @@ -0,0 +1,25 @@ +package spark.deploy.master + +import com.codahale.metrics.{Gauge,MetricRegistry} + +import spark.metrics.source.Source + +private[spark] class MasterSource(val master: Master) extends Source { + val metricRegistry = new MetricRegistry() + val sourceName = "master" + + // Gauge for worker numbers in cluster + metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { + override def getValue: Int = master.workers.size + }) + + // Gauge for application numbers in cluster + metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { + override def getValue: Int = master.apps.size + }) + + // Gauge for waiting application numbers in cluster + metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] { + override def getValue: Int = master.waitingApps.size + }) +} diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index eaa1c1806f..5c0f77fd75 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -68,7 +68,7 @@ private[spark] class Worker( var coresUsed = 0 var memoryUsed = 0 - val workerInstrumentation = new WorkerInstrumentation(this) + val workerSource = new WorkerSource(this) def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -102,7 +102,7 @@ private[spark] class Worker( connectToMaster() startWebUi() - Worker.metricsSystem.registerSource(workerInstrumentation) + Worker.metricsSystem.registerSource(workerSource) Worker.metricsSystem.start() } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala deleted file mode 100644 index c76c0b4711..0000000000 --- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala +++ /dev/null @@ -1,34 +0,0 @@ -package spark.deploy.worker - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import spark.metrics.source.Source - -private[spark] class WorkerInstrumentation(val worker: Worker) extends Source { - val sourceName = "worker" - val metricRegistry = new MetricRegistry() - - metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] { - override def getValue: Int = worker.executors.size - }) - - // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] { - override def getValue: Int = worker.coresUsed - }) - - // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] { - override def getValue: Int = worker.memoryUsed - }) - - // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] { - override def getValue: Int = worker.coresFree - }) - - // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] { - override def getValue: Int = worker.memoryFree - }) -} diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala new file mode 100644 index 0000000000..539eac71bd --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala @@ -0,0 +1,34 @@ +package spark.deploy.worker + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import spark.metrics.source.Source + +private[spark] class WorkerSource(val worker: Worker) extends Source { + val sourceName = "worker" + val metricRegistry = new MetricRegistry() + + metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] { + override def getValue: Int = worker.executors.size + }) + + // Gauge for cores used of this worker + metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] { + override def getValue: Int = worker.coresUsed + }) + + // Gauge for memory used of this worker + metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] { + override def getValue: Int = worker.memoryUsed + }) + + // Gauge for cores free of this worker + metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] { + override def getValue: Int = worker.coresFree + }) + + // Gauge for memory used of this worker + metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] { + override def getValue: Int = worker.memoryFree + }) +} diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 4ea05dec1c..8a74a8d853 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -87,12 +87,12 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } ) - val executorInstrumentation = new ExecutorInstrumentation(this) + val executorSource = new ExecutorSource(this) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) SparkEnv.set(env) - env.metricsSystem.registerSource(executorInstrumentation) + env.metricsSystem.registerSource(executorSource) private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size") diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala deleted file mode 100644 index ad406f41b4..0000000000 --- a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala +++ /dev/null @@ -1,30 +0,0 @@ -package spark.executor - -import com.codahale.metrics.{Gauge, MetricRegistry} - -import spark.metrics.source.Source - -class ExecutorInstrumentation(val executor: Executor) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "executor" - - // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getActiveCount() - }) - - // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] { - override def getValue: Long = executor.threadPool.getCompletedTaskCount() - }) - - // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getPoolSize() - }) - - // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getMaximumPoolSize() - }) -} diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala new file mode 100644 index 0000000000..d8b531cb58 --- /dev/null +++ b/core/src/main/scala/spark/executor/ExecutorSource.scala @@ -0,0 +1,30 @@ +package spark.executor + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import spark.metrics.source.Source + +class ExecutorSource(val executor: Executor) extends Source { + val metricRegistry = new MetricRegistry() + val sourceName = "executor" + + // Gauge for executor thread pool's actively executing task counts + metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getActiveCount() + }) + + // Gauge for executor thread pool's approximate total number of tasks that have been completed + metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] { + override def getValue: Long = executor.threadPool.getCompletedTaskCount() + }) + + // Gauge for executor thread pool's current number of threads + metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getPoolSize() + }) + + // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool + metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getMaximumPoolSize() + }) +} diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala index f29bb9db67..462c28e894 100644 --- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala @@ -33,7 +33,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter { assert(sources.length === 1) assert(sinks.length === 2) - val source = new spark.deploy.master.MasterInstrumentation(null) + val source = new spark.deploy.master.MasterSource(null) metricsSystem.registerSource(source) assert(sources.length === 2) } -- cgit v1.2.3