aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorGrace Huang <jie.huang@intel.com>2013-09-27 14:47:38 +0800
committerGrace Huang <jie.huang@intel.com>2013-09-27 14:47:38 +0800
commit4b68be5f3c0a251453c184b233b3ca490812dafd (patch)
tree394a4d3fa8c89b0192f2c148e0dbfe9644ac52e4 /core
parent714fdabd99bbff3a0cdec5dcf06b021a3a3f2da8 (diff)
downloadspark-4b68be5f3c0a251453c184b233b3ca490812dafd.tar.gz
spark-4b68be5f3c0a251453c184b233b3ca490812dafd.tar.bz2
spark-4b68be5f3c0a251453c184b233b3ca490812dafd.zip
SPARK-900 Use coarser grained naming for metrics
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/util/NamingConventions.scala62
7 files changed, 96 insertions, 28 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index 5a24042e14..c72322e9c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.util.NamingConventions
class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
@@ -30,11 +31,11 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
override def getValue: String = application.state.toString
})
- metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] {
override def getValue: Long = application.duration
})
- metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 23d1cb77da..de3939836b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -20,23 +20,24 @@ package org.apache.spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.util.NamingConventions
private[spark] class MasterSource(val master: Master) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "master"
// Gauge for worker numbers in cluster
- metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
// Gauge for application numbers in cluster
- metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] {
override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index df269fd047..fc4f4ae99c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -20,32 +20,33 @@ package org.apache.spark.deploy.worker
import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.spark.metrics.source.Source
+import org.apache.spark.util.NamingConventions
private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
- metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory free of this worker
- metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 18c9dc1c0a..6cbd154603 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.LocalFileSystem
import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
+import org.apache.spark.util.NamingConventions
class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
@@ -43,31 +44,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
- metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
- metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
- metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
- metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
// Gauge for file system stats of this executor
for (scheme <- Array("hdfs", "file")) {
- registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
- registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
- registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
- registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
- registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0)
+ registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 446d490cc9..9e90a08411 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -21,29 +21,30 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
+import org.apache.spark.util.NamingConventions
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.DAGScheduler".format(sc.appName)
- metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
- metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
- metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
- metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextJobId.get()
})
- metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index acc3951088..4312250240 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -21,6 +21,7 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
+import org.apache.spark.util.NamingConventions
private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
@@ -28,7 +29,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
val metricRegistry = new MetricRegistry()
val sourceName = "%s.BlockManager".format(sc.appName)
- metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -36,7 +37,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
@@ -44,7 +45,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -53,7 +54,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
}
})
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
diff --git a/core/src/main/scala/org/apache/spark/util/NamingConventions.scala b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala
new file mode 100644
index 0000000000..7263361beb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/NamingConventions.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+/**
+ * all utilities related to naming conventions
+ */
+private[spark] object NamingConventions {
+
+ /**
+ * Lower camelCase which convert the phrases into camelCase style with the first letter lowercase
+ */
+ def lowerCamelCaseConversion(phrases: Seq[String]): Seq[String] = {
+ var first = true
+
+ for (elem <- phrases) yield {
+ if (first) {
+ first = false
+ elem
+ }
+ else {
+ elem.capitalize
+ }
+ }
+ }
+
+ /**
+ * The standard camelCase style
+ */
+ def camelCaseConversion(phrases: Seq[String]): Seq[String] = {
+ phrases.map(_.capitalize)
+ }
+
+ def noConversion = { x: Seq[String] => x }
+
+ /**
+ * Concatenate the words using certain naming style.
+ * The default style is lowerCamelCase with empty connector.
+ */
+ def makeIdentifier(phrases: Seq[String], namingConversion: (Seq[String]) => Seq[String] = lowerCamelCaseConversion) (implicit connector: String = "" ): String = {
+ namingConversion(phrases.filter(_.nonEmpty)).mkString(connector)
+ }
+
+ def makeMetricName(phrases: String*): String = {
+ makeIdentifier(phrases, noConversion)("_")
+ }
+} \ No newline at end of file