aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/metrics.properties.template73
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterSource.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerSource.scala12
-rw-r--r--core/src/main/scala/spark/executor/ExecutorSource.scala8
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala50
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala83
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala12
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala10
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala31
-rw-r--r--core/src/test/resources/test_metrics_config.properties6
-rw-r--r--core/src/test/resources/test_metrics_system.properties4
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala38
-rw-r--r--core/src/test/scala/spark/metrics/MetricsSystemSuite.scala13
16 files changed, 250 insertions, 126 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index c7e24aa36c..0486ca4c79 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -1,11 +1,82 @@
# syntax: [instance].[sink|source].[name].[options]
+# "instance" specify "who" (the role) use metrics system. In spark there are
+# several roles like master, worker, executor, driver, these roles will
+# create metrics system for monitoring. So instance represents these roles.
+# Currently in Spark, several instances have already implemented: master,
+# worker, executor, driver.
+#
+# [instance] field can be "master", "worker", "executor", "driver", which means
+# only the specified instance has this property.
+# a wild card "*" can be used to represent instance name, which means all the
+# instances will have this property.
+#
+# "source" specify "where" (source) to collect metrics data. In metrics system,
+# there exists two kinds of source:
+# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
+# collect Spark component's internal state, these sources are related to
+# instance and will be added after specific metrics system is created.
+# 2. Common source, like JvmSource, which will collect low level state, is
+# configured by configuration and loaded through reflection.
+#
+# "sink" specify "where" (destination) to output metrics data to. Several sinks
+# can be coexisted and flush metrics to all these sinks.
+#
+# [sink|source] field specify this property is source related or sink, this
+# field can only be source or sink.
+#
+# [name] field specify the name of source or sink, this is custom defined.
+#
+# [options] field is the specific property of this source or sink, this source
+# or sink is responsible for parsing this property.
+#
+# Notes:
+# 1. Sinks should be added through configuration, like console sink, class
+# full name should be specified by class property.
+# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
+# it should be attention minimal polling period is 1 seconds, any period
+# below than 1s is illegal.
+# 3. Wild card property can be overlapped by specific instance property, for
+# example, *.sink.console.period can be overlapped by master.sink.console.period.
+# 4. A metrics specific configuration
+# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
+# added to Java property using -Dspark.metrics.conf=xxx if you want to
+# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
+# metrics system will search and load it automatically.
+
+# Enable JmxSink for all instances by class name
+#*.sink.jmx.class=spark.metrics.sink.JmxSink
+
+# Enable ConsoleSink for all instances by class name
#*.sink.console.class=spark.metrics.sink.ConsoleSink
+# Polling period for ConsoleSink
#*.sink.console.period=10
-#*.sink.console.unit=second
+#*.sink.console.unit=seconds
+
+# Master instance overlap polling period
+#master.sink.console.period=15
+
+#master.sink.console.unit=seconds
+
+# Enable CsvSink for all instances
+#*.sink.csv.class=spark.metrics.sink.CsvSink
+
+# Polling period for CsvSink
+#*.sink.csv.period=1
+
+#*.sink.csv.unit=minutes
+
+# Polling directory for CsvSink
+#*.sink.csv.directory=/tmp/
+
+# Worker instance overlap polling period
+#worker.sink.csv.period=10
+
+#worker.sink.csv.unit=minutes
+# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=spark.metrics.source.JvmSource
#worker.source.jvm.class=spark.metrics.source.JvmSource
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 1d592206c0..9692af5295 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -58,6 +58,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
+ val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
@@ -77,12 +78,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
- Master.metricsSystem.registerSource(masterSource)
- Master.metricsSystem.start()
+ metricsSystem.registerSource(masterSource)
+ metricsSystem.start()
}
override def postStop() {
webUi.stop()
+ metricsSystem.stop()
}
override def receive = {
@@ -322,17 +324,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}
-
- override def postStop() {
- Master.metricsSystem.stop()
- }
}
private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
- private val metricsSystem = MetricsSystem.createMetricsSystem("master")
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
index 65c22320d6..b8cfa6a773 100644
--- a/core/src/main/scala/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala
@@ -19,7 +19,7 @@ private[spark] class MasterSource(val master: Master) extends Source {
})
// Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 5c0f77fd75..8fa0d12b82 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -68,6 +68,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker")
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed
@@ -100,10 +101,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
- startWebUi()
- Worker.metricsSystem.registerSource(workerSource)
- Worker.metricsSystem.start()
+ metricsSystem.registerSource(workerSource)
+ metricsSystem.start()
}
def connectToMaster() {
@@ -185,14 +185,11 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
webUi.stop()
-
- Worker.metricsSystem.stop()
+ metricsSystem.stop()
}
}
private[spark] object Worker {
- private val metricsSystem = MetricsSystem.createMetricsSystem("worker")
-
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
index 539eac71bd..39cb8e5690 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
@@ -8,27 +8,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
- metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
- // Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] {
+ // Gauge for memory free of this worker
+ metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
index d8b531cb58..94116edfcf 100644
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -9,22 +9,22 @@ class ExecutorSource(val executor: Executor) extends Source {
val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
- metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
- metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
- metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
- metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 5066b7ac22..ed505b0aa7 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -1,44 +1,58 @@
package spark.metrics
import java.util.Properties
-import java.io.{File, FileInputStream}
+import java.io.{File, FileInputStream, InputStream, IOException}
import scala.collection.mutable
import scala.util.matching.Regex
-private[spark] class MetricsConfig(val configFile: String) {
- val properties = new Properties()
+import spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+ initLogging()
+
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+ val METRICS_CONF = "metrics.properties"
+
+ val properties = new Properties()
var propertyCategories: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
- prop.setProperty("*.sink.jmx.enabled", "default")
- prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
+ // empty function, any default property can be set here
}
- def initilize() {
+ def initialize() {
//Add default properties in case there's no properties file
setDefaultProperties(properties)
- val confFile = new File(configFile)
- if (confFile.exists()) {
- var fis: FileInputStream = null
- try {
- fis = new FileInputStream(configFile)
- properties.load(fis)
- } finally {
- fis.close()
+ // If spark.metrics.conf is not set, try to get file in class path
+ var is: InputStream = null
+ try {
+ is = configFile match {
+ case Some(f) => new FileInputStream(f)
+ case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+ }
+
+ if (is != null) {
+ properties.load(is)
}
+ } catch {
+ case e: Exception => logError("Error loading configure file", e)
+ } finally {
+ if (is != null) is.close()
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
+
val defaultProperty = propertyCategories(DEFAULT_PREFIX)
- for ((inst, prop) <- propertyCategories; p <- defaultProperty
- if inst != DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
- prop.setProperty(p._1, p._2)
+ for { (inst, prop) <- propertyCategories
+ if (inst != DEFAULT_PREFIX)
+ (k, v) <- defaultProperty
+ if (prop.getProperty(k) == null) } {
+ prop.setProperty(k, v)
}
}
}
@@ -58,7 +72,7 @@ private[spark] class MetricsConfig(val configFile: String) {
def getInstance(inst: String): Properties = {
propertyCategories.get(inst) match {
case Some(s) => s
- case None => propertyCategories(DEFAULT_PREFIX)
+ case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
}
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 54f6c6e4da..2f87577ff3 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -11,19 +11,51 @@ import spark.Logging
import spark.metrics.sink.Sink
import spark.metrics.source.Source
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ * Spark component's internal state, these sources are related to instance and will be
+ * added after specific metrics system is created.
+ * 2. Common source, like JvmSource, which will collect low level state, is configured by
+ * configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", which means only the specified
+ * instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
- val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
- val metricsConfig = new MetricsConfig(confFile)
+ val confFile = System.getProperty("spark.metrics.conf")
+ val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
- val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
-
- metricsConfig.initilize()
+ metricsConfig.initialize()
registerSources()
registerSinks()
@@ -37,7 +69,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
def registerSource(source: Source) {
sources += source
- registry.register(source.sourceName, source.metricRegistry)
+ try {
+ registry.register(source.sourceName, source.metricRegistry)
+ } catch {
+ case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+ }
}
def registerSources() {
@@ -51,7 +87,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
- case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
+ case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
}
}
}
@@ -61,18 +97,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
- val classPath = if (DEFAULT_SINKS.contains(kv._1)) {
- DEFAULT_SINKS(kv._1)
- } else {
- // For non-default sink, a property class should be set and create using reflection
- kv._2.getProperty("class")
- }
+ val classPath = kv._2.getProperty("class")
try {
- val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
+ val sink = Class.forName(classPath)
+ .getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
sinks += sink.asInstanceOf[Sink]
} catch {
- case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
+ case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
}
}
@@ -81,12 +113,17 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
private[spark] object MetricsSystem {
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
- val timeUnits = Map(
- "illisecond" -> TimeUnit.MILLISECONDS,
- "second" -> TimeUnit.SECONDS,
- "minute" -> TimeUnit.MINUTES,
- "hour" -> TimeUnit.HOURS,
- "day" -> TimeUnit.DAYS)
-
- def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+
+ val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+ val MINIMAL_POLL_PERIOD = 1
+
+ def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+ val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+ if (period < MINIMAL_POLL_PERIOD) {
+ throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+ " below than minimal polling period ")
+ }
+ }
+
+ def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index 437f24a575..eaaac5d153 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -8,22 +8,24 @@ import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
- val CONSOLE_DEFAULT_PERIOD = "10"
- val CONSOLE_DEFAULT_UNIT = "second"
+ val CONSOLE_DEFAULT_PERIOD = 10
+ val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
- case None => CONSOLE_DEFAULT_PERIOD.toInt
+ case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
- case Some(s) => MetricsSystem.timeUnits(s)
- case None => MetricsSystem.timeUnits(CONSOLE_DEFAULT_UNIT)
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index a8ca819e87..aa5bff0d34 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -13,19 +13,21 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
- val CSV_DEFAULT_PERIOD = "10"
- val CSV_DEFAULT_UNIT = "second"
+ val CSV_DEFAULT_PERIOD = 10
+ val CSV_DEFAULT_UNIT = "SECONDS"
val CSV_DEFAULT_DIR = "/tmp/"
val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
- case None => CSV_DEFAULT_PERIOD.toInt
+ case None => CSV_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
- case Some(s) => MetricsSystem.timeUnits(s)
- case None => MetricsSystem.timeUnits(CSV_DEFAULT_UNIT)
+ case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+ case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
index 38158b8a2b..87d27cc70d 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -8,23 +8,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
- metricRegistry.register(MetricRegistry.name("stage", "failedStage"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
- metricRegistry.register(MetricRegistry.name("stage", "runningStage"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
- metricRegistry.register(MetricRegistry.name("stage", "waitingStage"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
- metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
})
- metricRegistry.register(MetricRegistry.name("job", "ActiveJobs"), new Gauge[Int] {
+ metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
index f964827102..4faa715c94 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -9,27 +9,40 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager) extends
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
- metricRegistry.register(MetricRegistry.name("memory", "maxMem"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
- maxMem
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ maxMem / 1024 / 1024
}
})
- metricRegistry.register(MetricRegistry.name("memory", "remainingMem"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
- remainingMem
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ remainingMem / 1024 / 1024
}
})
- metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
- diskSpaceUsed
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+ (maxMem - remainingMem) / 1024 / 1024
+ }
+ })
+
+ metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+ override def getValue: Long = {
+ val storageStatusList = blockManager.master.getStorageStatus
+ val diskSpaceUsed = storageStatusList
+ .flatMap(_.blocks.values.map(_.diskSize))
+ .reduceOption(_ + _)
+ .getOrElse(0L)
+
+ diskSpaceUsed / 1024 / 1024
}
})
}
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
index 2011940003..2b31ddf2eb 100644
--- a/core/src/test/resources/test_metrics_config.properties
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -1,6 +1,6 @@
*.sink.console.period = 10
-*.sink.console.unit = second
+*.sink.console.unit = seconds
*.source.jvm.class = spark.metrics.source.JvmSource
master.sink.console.period = 20
-master.sink.console.unit = minute
-
+master.sink.console.unit = minutes
+
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
index 06afbc6625..d5479f0298 100644
--- a/core/src/test/resources/test_metrics_system.properties
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -1,7 +1,7 @@
*.sink.console.period = 10
-*.sink.console.unit = second
+*.sink.console.unit = seconds
test.sink.console.class = spark.metrics.sink.ConsoleSink
test.sink.dummy.class = spark.metrics.sink.DummySink
test.source.dummy.class = spark.metrics.source.DummySource
test.sink.console.period = 20
-test.sink.console.unit = minute
+test.sink.console.unit = minutes
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index bb1be4f4fc..87cd2ffad2 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -15,40 +15,36 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
}
test("MetricsConfig with default properties") {
- val conf = new MetricsConfig("dummy-file")
- conf.initilize()
+ val conf = new MetricsConfig(Option("dummy-file"))
+ conf.initialize()
- assert(conf.properties.size() === 2)
- assert(conf.properties.getProperty("*.sink.jmx.enabled") === "default")
- assert(conf.properties.getProperty("*.source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(conf.properties.size() === 0)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
- assert(property.size() === 2)
- assert(property.getProperty("sink.jmx.enabled") === "default")
- assert(property.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+ assert(property.size() === 0)
}
test("MetricsConfig with properties set") {
- val conf = new MetricsConfig(filePath)
- conf.initilize()
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 4)
+ assert(masterProp.size() === 3)
assert(masterProp.getProperty("sink.console.period") === "20")
- assert(masterProp.getProperty("sink.console.unit") === "minute")
- assert(masterProp.getProperty("sink.jmx.enabled") === "default")
- assert(masterProp.getProperty("source.jvm.class") == "spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 4)
+ assert(workerProp.size() === 3)
assert(workerProp.getProperty("sink.console.period") === "10")
- assert(workerProp.getProperty("sink.console.unit") === "second")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
}
test("MetricsConfig with subProperties") {
- val conf = new MetricsConfig(filePath)
- conf.initilize()
+ val conf = new MetricsConfig(Option(filePath))
+ conf.initialize()
val propCategories = conf.propertyCategories
assert(propCategories.size === 2)
@@ -59,14 +55,10 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
- assert(sinkProps.size === 2)
+ assert(sinkProps.size === 1)
assert(sinkProps.contains("console"))
- assert(sinkProps.contains("jmx"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
-
- val jmxProps = sinkProps("jmx")
- assert(jmxProps.size() === 1)
}
}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index 462c28e894..c189996417 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -12,7 +12,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
- System.setProperty("spark.metrics.conf.file", filePath)
+ System.setProperty("spark.metrics.conf", filePath)
}
test("MetricsSystem with default config") {
@@ -20,9 +20,8 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
- assert(sources.length === 1)
- assert(sinks.length === 1)
- assert(sources(0).sourceName === "jvm")
+ assert(sources.length === 0)
+ assert(sinks.length === 0)
}
test("MetricsSystem with sources add") {
@@ -30,11 +29,11 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
- assert(sources.length === 1)
- assert(sinks.length === 2)
+ assert(sources.length === 0)
+ assert(sinks.length === 1)
val source = new spark.deploy.master.MasterSource(null)
metricsSystem.registerSource(source)
- assert(sources.length === 2)
+ assert(sources.length === 1)
}
}