From 4b68be5f3c0a251453c184b233b3ca490812dafd Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Fri, 27 Sep 2013 14:47:38 +0800 Subject: SPARK-900 Use coarser grained naming for metrics --- .../spark/deploy/master/ApplicationSource.scala | 5 +- .../apache/spark/deploy/master/MasterSource.scala | 7 +-- .../apache/spark/deploy/worker/WorkerSource.scala | 11 ++-- .../org/apache/spark/executor/ExecutorSource.scala | 19 +++---- .../spark/scheduler/DAGSchedulerSource.scala | 11 ++-- .../apache/spark/storage/BlockManagerSource.scala | 9 ++-- .../org/apache/spark/util/NamingConventions.scala | 62 ++++++++++++++++++++++ 7 files changed, 96 insertions(+), 28 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/NamingConventions.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index 5a24042e14..c72322e9c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.util.NamingConventions class ApplicationSource(val application: ApplicationInfo) extends Source { val metricRegistry = new MetricRegistry() @@ -30,11 +31,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: String = application.state.toString }) - metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { override def getValue: Int = application.coresGranted }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 23d1cb77da..de3939836b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -20,23 +20,24 @@ package org.apache.spark.deploy.master import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.util.NamingConventions private[spark] class MasterSource(val master: Master) extends Source { val metricRegistry = new MetricRegistry() val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index df269fd047..fc4f4ae99c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -20,32 +20,33 @@ package org.apache.spark.deploy.worker import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.util.NamingConventions private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory free of this worker - metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { override def getValue: Int = worker.memoryFree }) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 18c9dc1c0a..6cbd154603 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.LocalFileSystem import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source +import org.apache.spark.util.NamingConventions class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = @@ -43,31 +44,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) - registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) - registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 446d490cc9..9e90a08411 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -21,29 +21,30 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source import org.apache.spark.SparkContext +import org.apache.spark.util.NamingConventions private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index acc3951088..4312250240 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -21,6 +21,7 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source import org.apache.spark.SparkContext +import org.apache.spark.util.NamingConventions private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) @@ -28,7 +29,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -36,7 +37,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) @@ -44,7 +45,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -53,7 +54,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val diskSpaceUsed = storageStatusList diff --git a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala new file mode 100644 index 0000000000..7263361beb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +/** + * all utilities related to naming conventions + */ +private[spark] object NamingConventions { + + /** + * Lower camelCase which convert the phrases into camelCase style with the first letter lowercase + */ + def lowerCamelCaseConversion(phrases: Seq[String]): Seq[String] = { + var first = true + + for (elem <- phrases) yield { + if (first) { + first = false + elem + } + else { + elem.capitalize + } + } + } + + /** + * The standard camelCase style + */ + def camelCaseConversion(phrases: Seq[String]): Seq[String] = { + phrases.map(_.capitalize) + } + + def noConversion = { x: Seq[String] => x } + + /** + * Concatenate the words using certain naming style. + * The default style is lowerCamelCase with empty connector. + */ + def makeIdentifier(phrases: Seq[String], namingConversion: (Seq[String]) => Seq[String] = lowerCamelCaseConversion) (implicit connector: String = "" ): String = { + namingConversion(phrases.filter(_.nonEmpty)).mkString(connector) + } + + def makeMetricName(phrases: String*): String = { + makeIdentifier(phrases, noConversion)("_") + } +} \ No newline at end of file -- cgit v1.2.3 From 892fb8ffa85016a63d7d00dd6f1abc58ccf034a2 Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Mon, 30 Sep 2013 20:12:55 +0800 Subject: remedy the line-wrap while exceeding 100 chars --- .../spark/deploy/master/ApplicationSource.scala | 13 +++-- .../apache/spark/deploy/master/MasterSource.scala | 19 ++++--- .../apache/spark/deploy/worker/WorkerSource.scala | 31 +++++----- .../org/apache/spark/executor/ExecutorSource.scala | 45 +++++++++------ .../spark/scheduler/DAGSchedulerSource.scala | 31 +++++----- .../apache/spark/storage/BlockManagerSource.scala | 66 ++++++++++++---------- 6 files changed, 114 insertions(+), 91 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index c72322e9c2..f0b1f777fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -31,12 +31,13 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: String = application.state.toString }) - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { - override def getValue: Long = application.duration - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), + new Gauge[Long] { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { - override def getValue: Int = application.coresGranted - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), + new Gauge[Int] { override def getValue: Int = application.coresGranted }) } + diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index de3939836b..8a88fef330 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -27,17 +27,18 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { - override def getValue: Int = master.workers.size - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), + new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { - override def getValue: Int = master.apps.size - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), + new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { - override def getValue: Int = master.waitingApps.size - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), + new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } + diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index fc4f4ae99c..0596f14355 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -26,27 +26,28 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { - override def getValue: Int = worker.executors.size - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), + new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { - override def getValue: Int = worker.coresUsed - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), + new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { - override def getValue: Int = worker.memoryUsed - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), + new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { - override def getValue: Int = worker.coresFree - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), + new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory free of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { - override def getValue: Int = worker.memoryFree - }) + metricRegistry.register( + MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), + new Gauge[Int] { override def getValue: Int = worker.memoryFree }) } + diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 6cbd154603..d063e4ad0b 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -44,31 +44,42 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getActiveCount() - }) + metricRegistry.register( + MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), + new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { - override def getValue: Long = executor.threadPool.getCompletedTaskCount() - }) + metricRegistry.register( + MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), + new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getPoolSize() - }) + metricRegistry.register( + MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), + new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { - override def getValue: Int = executor.threadPool.getMaximumPoolSize() - }) + metricRegistry.register( + MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), + new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) + registerFileSystemStat(scheme, + NamingConventions.makeMetricName("read", "bytes"), + _.getBytesRead(), 0L) + registerFileSystemStat(scheme, + NamingConventions.makeMetricName("write", "bytes"), + _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, + NamingConventions.makeMetricName("read", "ops"), + _.getReadOps(), 0) + registerFileSystemStat(scheme, + NamingConventions.makeMetricName("largeRead", "ops"), + _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, + NamingConventions.makeMetricName("write", "ops"), + _.getWriteOps(), 0) } } + diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 9e90a08411..02fb80760f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -28,23 +28,24 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { - override def getValue: Int = dagScheduler.failed.size - }) + metricRegistry.register( + MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), + new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { - override def getValue: Int = dagScheduler.running.size - }) + metricRegistry.register( + MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), + new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { - override def getValue: Int = dagScheduler.waiting.size - }) + metricRegistry.register( + MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), + new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { - override def getValue: Int = dagScheduler.nextJobId.get() - }) + metricRegistry.register( + MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), + new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { - override def getValue: Int = dagScheduler.activeJobs.size - }) + metricRegistry.register( + MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), + new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 4312250240..fcf9da481b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -29,40 +29,48 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - maxMem / 1024 / 1024 - } + metricRegistry.register( + MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), + new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + maxMem / 1024 / 1024 + } }) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - remainingMem / 1024 / 1024 - } + metricRegistry.register( + MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), + new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + remainingMem / 1024 / 1024 + } }) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - (maxMem - remainingMem) / 1024 / 1024 - } + metricRegistry.register( + MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), + new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + (maxMem - remainingMem) / 1024 / 1024 + } }) - metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val diskSpaceUsed = storageStatusList - .flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_ + _) - .getOrElse(0L) - - diskSpaceUsed / 1024 / 1024 - } + metricRegistry.register( + MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val diskSpaceUsed = storageStatusList + .flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_ + _) + .getOrElse(0L) + + diskSpaceUsed / 1024 / 1024 + } }) } + -- cgit v1.2.3 From a2af6b543a0a70d94a451c9022deea181d04f8e8 Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Tue, 8 Oct 2013 17:44:56 +0800 Subject: Revert "remedy the line-wrap while exceeding 100 chars" This reverts commit 892fb8ffa85016a63d7d00dd6f1abc58ccf034a2. --- .../spark/deploy/master/ApplicationSource.scala | 13 ++--- .../apache/spark/deploy/master/MasterSource.scala | 19 +++---- .../apache/spark/deploy/worker/WorkerSource.scala | 31 +++++----- .../org/apache/spark/executor/ExecutorSource.scala | 45 ++++++--------- .../spark/scheduler/DAGSchedulerSource.scala | 31 +++++----- .../apache/spark/storage/BlockManagerSource.scala | 66 ++++++++++------------ 6 files changed, 91 insertions(+), 114 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index f0b1f777fd..c72322e9c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -31,13 +31,12 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: String = application.state.toString }) - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), - new Gauge[Long] { override def getValue: Long = application.duration }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { + override def getValue: Long = application.duration + }) - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), - new Gauge[Int] { override def getValue: Int = application.coresGranted }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { + override def getValue: Int = application.coresGranted + }) } - diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 8a88fef330..de3939836b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -27,18 +27,17 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), - new Gauge[Int] { override def getValue: Int = master.workers.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { + override def getValue: Int = master.workers.size + }) // Gauge for application numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), - new Gauge[Int] { override def getValue: Int = master.apps.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { + override def getValue: Int = master.apps.size + }) // Gauge for waiting application numbers in cluster - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), - new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { + override def getValue: Int = master.waitingApps.size + }) } - diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index 0596f14355..fc4f4ae99c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -26,28 +26,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), - new Gauge[Int] { override def getValue: Int = worker.executors.size }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { + override def getValue: Int = worker.executors.size + }) // Gauge for cores used of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), - new Gauge[Int] { override def getValue: Int = worker.coresUsed }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { + override def getValue: Int = worker.coresUsed + }) // Gauge for memory used of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), - new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { + override def getValue: Int = worker.memoryUsed + }) // Gauge for cores free of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), - new Gauge[Int] { override def getValue: Int = worker.coresFree }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { + override def getValue: Int = worker.coresFree + }) // Gauge for memory free of this worker - metricRegistry.register( - MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), - new Gauge[Int] { override def getValue: Int = worker.memoryFree }) + metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { + override def getValue: Int = worker.memoryFree + }) } - diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d063e4ad0b..6cbd154603 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -44,42 +44,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getActiveCount() + }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), - new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { + override def getValue: Long = executor.threadPool.getCompletedTaskCount() + }) // Gauge for executor thread pool's current number of threads - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getPoolSize() + }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register( - MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), - new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) + metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { + override def getValue: Int = executor.threadPool.getMaximumPoolSize() + }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("read", "bytes"), - _.getBytesRead(), 0L) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("write", "bytes"), - _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("read", "ops"), - _.getReadOps(), 0) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("largeRead", "ops"), - _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, - NamingConventions.makeMetricName("write", "ops"), - _.getWriteOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) } } - diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 02fb80760f..9e90a08411 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -28,24 +28,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.failed.size + }) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.running.size + }) - metricRegistry.register( - MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) + metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.waiting.size + }) - metricRegistry.register( - MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.nextJobId.get() + }) - metricRegistry.register( - MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), - new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) + metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { + override def getValue: Int = dagScheduler.activeJobs.size + }) } - diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index fcf9da481b..4312250240 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -29,48 +29,40 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - maxMem / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + maxMem / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - remainingMem / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + remainingMem / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), - new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) - (maxMem - remainingMem) / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) + (maxMem - remainingMem) / 1024 / 1024 + } }) - metricRegistry.register( - MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { - override def getValue: Long = { - val storageStatusList = blockManager.master.getStorageStatus - val diskSpaceUsed = storageStatusList - .flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_ + _) - .getOrElse(0L) - - diskSpaceUsed / 1024 / 1024 - } + metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { + override def getValue: Long = { + val storageStatusList = blockManager.master.getStorageStatus + val diskSpaceUsed = storageStatusList + .flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_ + _) + .getOrElse(0L) + + diskSpaceUsed / 1024 / 1024 + } }) } - -- cgit v1.2.3 From 188abbf8f1765d3f52eff8b852309352630ab8fb Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Tue, 8 Oct 2013 17:45:14 +0800 Subject: Revert "SPARK-900 Use coarser grained naming for metrics" This reverts commit 4b68be5f3c0a251453c184b233b3ca490812dafd. --- .../spark/deploy/master/ApplicationSource.scala | 5 +- .../apache/spark/deploy/master/MasterSource.scala | 7 ++- .../apache/spark/deploy/worker/WorkerSource.scala | 11 ++-- .../org/apache/spark/executor/ExecutorSource.scala | 19 ++++--- .../spark/scheduler/DAGSchedulerSource.scala | 11 ++-- .../apache/spark/storage/BlockManagerSource.scala | 9 ++-- .../org/apache/spark/util/NamingConventions.scala | 62 ---------------------- 7 files changed, 28 insertions(+), 96 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/NamingConventions.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index c72322e9c2..5a24042e14 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy.master import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source -import org.apache.spark.util.NamingConventions class ApplicationSource(val application: ApplicationInfo) extends Source { val metricRegistry = new MetricRegistry() @@ -31,11 +30,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: String = application.state.toString }) - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index de3939836b..23d1cb77da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -20,24 +20,23 @@ package org.apache.spark.deploy.master import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source -import org.apache.spark.util.NamingConventions private[spark] class MasterSource(val master: Master) extends Source { val metricRegistry = new MetricRegistry() val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index fc4f4ae99c..df269fd047 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -20,33 +20,32 @@ package org.apache.spark.deploy.worker import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source -import org.apache.spark.util.NamingConventions private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory free of this worker - metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryFree }) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 6cbd154603..18c9dc1c0a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.LocalFileSystem import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -import org.apache.spark.util.NamingConventions class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = @@ -44,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0) + registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) + registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) + registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 9e90a08411..446d490cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -21,30 +21,29 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source import org.apache.spark.SparkContext -import org.apache.spark.util.NamingConventions private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 4312250240..acc3951088 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -21,7 +21,6 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source import org.apache.spark.SparkContext -import org.apache.spark.util.NamingConventions private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) @@ -29,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -37,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) @@ -45,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -54,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val diskSpaceUsed = storageStatusList diff --git a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala deleted file mode 100644 index 7263361beb..0000000000 --- a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -/** - * all utilities related to naming conventions - */ -private[spark] object NamingConventions { - - /** - * Lower camelCase which convert the phrases into camelCase style with the first letter lowercase - */ - def lowerCamelCaseConversion(phrases: Seq[String]): Seq[String] = { - var first = true - - for (elem <- phrases) yield { - if (first) { - first = false - elem - } - else { - elem.capitalize - } - } - } - - /** - * The standard camelCase style - */ - def camelCaseConversion(phrases: Seq[String]): Seq[String] = { - phrases.map(_.capitalize) - } - - def noConversion = { x: Seq[String] => x } - - /** - * Concatenate the words using certain naming style. - * The default style is lowerCamelCase with empty connector. - */ - def makeIdentifier(phrases: Seq[String], namingConversion: (Seq[String]) => Seq[String] = lowerCamelCaseConversion) (implicit connector: String = "" ): String = { - namingConversion(phrases.filter(_.nonEmpty)).mkString(connector) - } - - def makeMetricName(phrases: String*): String = { - makeIdentifier(phrases, noConversion)("_") - } -} \ No newline at end of file -- cgit v1.2.3 From 22bed59d2de15d89ab5a778bea7439711e3cb13b Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Tue, 8 Oct 2013 18:01:11 +0800 Subject: create metrics name manually. --- .../apache/spark/deploy/master/ApplicationSource.scala | 2 +- .../org/apache/spark/deploy/master/MasterSource.scala | 6 +++--- .../org/apache/spark/deploy/worker/WorkerSource.scala | 10 +++++----- .../org/apache/spark/executor/ExecutorSource.scala | 18 +++++++++--------- .../apache/spark/scheduler/DAGSchedulerSource.scala | 10 +++++----- .../org/apache/spark/storage/BlockManagerSource.scala | 8 ++++---- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index 5a24042e14..01dbb3d0a6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("cores_number"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 23d1cb77da..a0f6fc4b20 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("workers_number"), new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("apps_number"), new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("waitingApps_number"), new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index df269fd047..5c2d3ee8d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("executors_number"), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresUsed_number"), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memUsed_MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryUsed }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresFree_number"), new Gauge[Int] { override def getValue: Int = worker.coresFree }) // Gauge for memory free of this worker - metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("memFree_MBytes"), new Gauge[Int] { override def getValue: Int = worker.memoryFree }) } diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 18c9dc1c0a..212390436c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "activeTask_count"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("threadpool", "completeTask_count"), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) // Gauge for executor thread pool's current number of threads - metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() }) // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool - metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) // Gauge for file system stats of this executor for (scheme <- Array("hdfs", "file")) { - registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L) - registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L) - registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0) - registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0) - registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0) + registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) + registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L) + registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) + registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0) + registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 446d490cc9..474f3e8730 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "failedStages_number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "runningStages_number"), new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "waitingStages_number"), new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "allJobs_number"), new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "activeJobs_number"), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index acc3951088..e5068d5587 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -28,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.BlockManager".format(sc.appName) - metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "maxMem_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -36,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _) @@ -44,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("memory", "memUsed_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _) @@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar } }) - metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MBytes"), new Gauge[Long] { override def getValue: Long = { val storageStatusList = blockManager.master.getStorageStatus val diskSpaceUsed = storageStatusList -- cgit v1.2.3 From f7628e40330a1721e3d69ccb2390dfa75f839768 Mon Sep 17 00:00:00 2001 From: Grace Huang Date: Wed, 9 Oct 2013 08:36:41 +0800 Subject: remove those futile suffixes like number/count --- .../org/apache/spark/deploy/master/ApplicationSource.scala | 2 +- .../scala/org/apache/spark/deploy/master/MasterSource.scala | 6 +++--- .../scala/org/apache/spark/deploy/worker/WorkerSource.scala | 6 +++--- .../main/scala/org/apache/spark/executor/ExecutorSource.scala | 4 ++-- .../scala/org/apache/spark/scheduler/DAGSchedulerSource.scala | 10 +++++----- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index 01dbb3d0a6..c87b66f047 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source { override def getValue: Long = application.duration }) - metricRegistry.register(MetricRegistry.name("cores_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index a0f6fc4b20..36c1b87b7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source { val sourceName = "master" // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name("workers_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] { override def getValue: Int = master.workers.size }) // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name("apps_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] { override def getValue: Int = master.apps.size }) // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name("waitingApps_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] { override def getValue: Int = master.waitingApps.size }) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index 5c2d3ee8d7..b24e936b1f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -25,12 +25,12 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { val sourceName = "worker" val metricRegistry = new MetricRegistry() - metricRegistry.register(MetricRegistry.name("executors_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] { override def getValue: Int = worker.executors.size }) // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name("coresUsed_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] { override def getValue: Int = worker.coresUsed }) @@ -40,7 +40,7 @@ private[spark] class WorkerSource(val worker: Worker) extends Source { }) // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name("coresFree_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] { override def getValue: Int = worker.coresFree }) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 212390436c..34ed9c8f73 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -43,12 +43,12 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts - metricRegistry.register(MetricRegistry.name("threadpool", "activeTask_count"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) // Gauge for executor thread pool's approximate total number of tasks that have been completed - metricRegistry.register(MetricRegistry.name("threadpool", "completeTask_count"), new Gauge[Long] { + metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 474f3e8730..151514896f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar val metricRegistry = new MetricRegistry() val sourceName = "%s.DAGScheduler".format(sc.appName) - metricRegistry.register(MetricRegistry.name("stage", "failedStages_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size }) - metricRegistry.register(MetricRegistry.name("stage", "runningStages_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.running.size }) - metricRegistry.register(MetricRegistry.name("stage", "waitingStages_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size }) - metricRegistry.register(MetricRegistry.name("job", "allJobs_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() }) - metricRegistry.register(MetricRegistry.name("job", "activeJobs_number"), new Gauge[Int] { + metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size }) } -- cgit v1.2.3