diff options
author | Reynold Xin <rxin@apache.org> | 2014-08-14 11:22:41 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-08-14 11:22:41 -0700 |
commit | eaeb0f76fa0f103c7db0f3975cb8562715410973 (patch) | |
tree | 959c0d7e8f82a727c0762289be1122b3c72ba9ad | |
parent | 267fdffe2743bc2dc706c8ac8af0ae33a358a5d3 (diff) | |
download | spark-eaeb0f76fa0f103c7db0f3975cb8562715410973.tar.gz spark-eaeb0f76fa0f103c7db0f3975cb8562715410973.tar.bz2 spark-eaeb0f76fa0f103c7db0f3975cb8562715410973.zip |
Minor cleanup of metrics.Source
- Added override.
- Marked some variables as private.
Author: Reynold Xin <rxin@apache.org>
Closes #1943 from rxin/metricsSource and squashes the following commits:
fbfa943 [Reynold Xin] Minor cleanup of metrics.Source. - Added override. - Marked some variables as private.
8 files changed, 20 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala index c87b66f047..38db02cd24 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source class ApplicationSource(val application: ApplicationInfo) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "%s.%s.%s".format("application", application.desc.name, + override val metricRegistry = new MetricRegistry() + override val sourceName = "%s.%s.%s".format("application", application.desc.name, System.currentTimeMillis()) metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala index 36c1b87b7f..9c3f79f124 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source private[spark] class MasterSource(val master: Master) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "master" + override val metricRegistry = new MetricRegistry() + override val sourceName = "master" // Gauge for worker numbers in cluster metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala index b7ddd8c816..df1e01b23b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala @@ -22,8 +22,8 @@ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.metrics.source.Source private[spark] class WorkerSource(val worker: Worker) extends Source { - val sourceName = "worker" - val metricRegistry = new MetricRegistry() + override val sourceName = "worker" + override val metricRegistry = new MetricRegistry() metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] { override def getValue: Int = worker.executors.size diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 0ed52cfe9d..d672158656 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -35,9 +35,10 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) }) } - val metricRegistry = new MetricRegistry() + override val metricRegistry = new MetricRegistry() + // TODO: It would be nice to pass the application name here - val sourceName = "executor.%s".format(executorId) + override val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala index f865f9648a..635bff2cd7 100644 --- a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala +++ b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala @@ -21,12 +21,9 @@ import com.codahale.metrics.MetricRegistry import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet} private[spark] class JvmSource extends Source { - val sourceName = "jvm" - val metricRegistry = new MetricRegistry() + override val sourceName = "jvm" + override val metricRegistry = new MetricRegistry() - val gcMetricSet = new GarbageCollectorMetricSet - val memGaugeSet = new MemoryUsageGaugeSet - - metricRegistry.registerAll(gcMetricSet) - metricRegistry.registerAll(memGaugeSet) + metricRegistry.registerAll(new GarbageCollectorMetricSet) + metricRegistry.registerAll(new MemoryUsageGaugeSet) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 5878e73390..94944399b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -24,8 +24,8 @@ import org.apache.spark.metrics.source.Source private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "%s.DAGScheduler".format(sc.appName) + override val metricRegistry = new MetricRegistry() + override val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3f14c40ec6..49fea6d9e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -24,8 +24,8 @@ import org.apache.spark.metrics.source.Source private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) extends Source { - val metricRegistry = new MetricRegistry() - val sourceName = "%s.BlockManager".format(sc.appName) + override val metricRegistry = new MetricRegistry() + override val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala index 774adc3c23..75f0e8716d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala @@ -23,10 +23,10 @@ import org.apache.spark.metrics.source.Source import org.apache.spark.streaming.ui.StreamingJobProgressListener private[streaming] class StreamingSource(ssc: StreamingContext) extends Source { - val metricRegistry = new MetricRegistry - val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName) + override val metricRegistry = new MetricRegistry + override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName) - val streamingListener = ssc.uiTab.listener + private val streamingListener = ssc.uiTab.listener private def registerGauge[T](name: String, f: StreamingJobProgressListener => T, defaultValue: T) { |