aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew xia <junluan.xia@intel.com>2013-07-03 20:11:10 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:47 +0800
commited1a3bc206c01974eedd0b1fb1deec183376b5c6 (patch)
treee7b12b544c4451702fd3a0fc0cdbafc38a713e2b
parent5730193e0c8639b654f489c5956e31451d81b7db (diff)
downloadspark-ed1a3bc206c01974eedd0b1fb1deec183376b5c6.tar.gz
spark-ed1a3bc206c01974eedd0b1fb1deec183376b5c6.tar.bz2
spark-ed1a3bc206c01974eedd0b1fb1deec183376b5c6.zip
continue to refactor code style and functions
-rw-r--r--core/src/main/scala/spark/SparkContext.scala6
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala6
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala19
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala35
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/spark/executor/ExecutorInstrumentation.scala35
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala88
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala40
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala30
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala40
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala23
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala14
-rw-r--r--core/src/test/scala/spark/metrics/MetricsConfigSuite.scala32
-rw-r--r--core/src/test/scala/spark/metrics/MetricsSystemSuite.scala14
14 files changed, 189 insertions, 201 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1e59a4d47d..77cb0ee0cd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -274,9 +274,9 @@ class SparkContext(
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
- def initDriverMetrics() = {
- SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
- SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
+ def initDriverMetrics() {
+ SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+ SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index cc0b2d4295..5f67366eb6 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -57,7 +57,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val webUi = new MasterWebUI(self, webUiPort)
Utils.checkHost(host, "Expected hostname")
-
+
val masterInstrumentation = new MasterInstrumentation(this)
val masterPublicAddress = {
@@ -76,7 +76,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
-
+
Master.metricsSystem.registerSource(masterInstrumentation)
Master.metricsSystem.start()
}
@@ -322,7 +322,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}
-
+
override def postStop() {
Master.metricsSystem.stop()
}
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 61a561c955..4c3708cc4c 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -5,24 +5,21 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class MasterInstrumentation(val master: Master) extends Source {
- val metricRegistry = new MetricRegistry()
+ val metricRegistry = new MetricRegistry()
val sourceName = "master"
- metricRegistry.register(MetricRegistry.name("workers","number"),
- new Gauge[Int] {
+ // Gauge for worker numbers in cluster
+ metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
-
+
// Gauge for application numbers in cluster
- metricRegistry.register(MetricRegistry.name("apps", "number"),
- new Gauge[Int] {
- override def getValue: Int = master.apps.size
+ metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
- metricRegistry.register(MetricRegistry.name("waiting_apps", "number"),
- new Gauge[Int] {
- override def getValue: Int = master.waitingApps.size
+ metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] {
+ override def getValue: Int = master.waitingApps.size
})
-
}
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 94c20a98c1..c76c0b4711 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -8,32 +8,27 @@ private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
- metricRegistry.register(MetricRegistry.name("executor", "number"),
- new Gauge[Int] {
- override def getValue: Int = worker.executors.size
+ metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.executors.size
})
-
+
// Gauge for cores used of this worker
- metricRegistry.register(MetricRegistry.name("core_used", "number"),
- new Gauge[Int] {
- override def getValue: Int = worker.coresUsed
+ metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresUsed
})
-
+
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"),
- new Gauge[Int] {
- override def getValue: Int = worker.memoryUsed
+ metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryUsed
})
-
+
// Gauge for cores free of this worker
- metricRegistry.register(MetricRegistry.name("core_free", "number"),
- new Gauge[Int] {
- override def getValue: Int = worker.coresFree
+ metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] {
+ override def getValue: Int = worker.coresFree
})
-
+
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"),
- new Gauge[Int] {
- override def getValue: Int = worker.memoryFree
- })
+ metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] {
+ override def getValue: Int = worker.memoryFree
+ })
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 7179ed84a8..4ea05dec1c 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,7 +69,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
-
+
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
@@ -86,14 +86,14 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
}
)
-
+
val executorInstrumentation = new ExecutorInstrumentation(this)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
- SparkEnv.set(env)
+ SparkEnv.set(env)
env.metricsSystem.registerSource(executorInstrumentation)
-
+
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
index ebbcbee742..ad406f41b4 100644
--- a/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
+++ b/core/src/main/scala/spark/executor/ExecutorInstrumentation.scala
@@ -4,32 +4,27 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
-class ExecutorInstrumentation(val executor: Executor) extends Source{
+class ExecutorInstrumentation(val executor: Executor) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
-
+
// Gauge for executor thread pool's actively executing task counts
- metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"),
- new Gauge[Int] {
- override def getValue: Int = executor.threadPool.getActiveCount()
+ metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getActiveCount()
})
-
+
// Gauge for executor thread pool's approximate total number of tasks that have been completed
- metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
- new Gauge[Long] {
- override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+ metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] {
+ override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
-
+
// Gauge for executor thread pool's current number of threads
- metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"),
- new Gauge[Int] {
- override def getValue: Int = executor.threadPool.getPoolSize()
+ metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getPoolSize()
})
-
- // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
- metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"),
- new Gauge[Int] {
- override def getValue: Int = executor.threadPool.getMaximumPoolSize()
- })
-} \ No newline at end of file
+ // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+ metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] {
+ override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+ })
+}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 7405192058..b1f6a1e596 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -3,63 +3,63 @@ package spark.metrics
import java.util.Properties
import java.io.{File, FileInputStream}
-import scala.collection.mutable
+import scala.collection.mutable.HashMap
import scala.util.matching.Regex
-private [spark] class MetricsConfig(val configFile: String) {
+private[spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
- // Add default properties in case there's no properties file
- MetricsConfig.setDefaultProperties(properties)
-
- val confFile = new File(configFile)
- if (confFile.exists()) {
- var fis: FileInputStream = null
- try {
- fis = new FileInputStream(configFile)
- properties.load(fis)
- } finally {
- fis.close()
- }
- }
-
- val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
- if (propertyCategories.contains(MetricsConfig.DEFAULT_PREFIX)) {
- import scala.collection.JavaConversions._
- val defaultProperty = propertyCategories(MetricsConfig.DEFAULT_PREFIX)
- for ((inst, prop) <- propertyCategories; p <- defaultProperty
- if inst != MetricsConfig.DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
- prop.setProperty(p._1, p._2)
- }
- }
-
- def getInstance(inst: String) = {
- propertyCategories.get(inst) match {
- case Some(s) => s
- case None => propertyCategories(MetricsConfig.DEFAULT_PREFIX)
- }
- }
-}
-
-private[spark] object MetricsConfig {
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
-
- def setDefaultProperties(prop: Properties) {
+ var propertyCategories: HashMap[String, Properties] = null
+
+ private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.jmx.enabled", "default")
prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
}
-
- def subProperties(prop: Properties, regex: Regex) = {
- val subProperties = new mutable.HashMap[String, Properties]
-
+
+ def initilize() {
+ //Add default properties in case there's no properties file
+ setDefaultProperties(properties)
+
+ val confFile = new File(configFile)
+ if (confFile.exists()) {
+ var fis: FileInputStream = null
+ try {
+ fis = new FileInputStream(configFile)
+ properties.load(fis)
+ } finally {
+ fis.close()
+ }
+ }
+
+ propertyCategories = subProperties(properties, INSTANCE_REGEX)
+ if (propertyCategories.contains(DEFAULT_PREFIX)) {
+ import scala.collection.JavaConversions._
+ val defaultProperty = propertyCategories(DEFAULT_PREFIX)
+ for ((inst, prop) <- propertyCategories; p <- defaultProperty
+ if inst != DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
+ prop.setProperty(p._1, p._2)
+ }
+ }
+ }
+
+ def subProperties(prop: Properties, regex: Regex): HashMap[String, Properties] = {
+ val subProperties = new HashMap[String, Properties]
import scala.collection.JavaConversions._
- prop.foreach { kv =>
+ prop.foreach { kv =>
if (regex.findPrefixOf(kv._1) != None) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
}
}
-
subProperties
}
-} \ No newline at end of file
+
+ def getInstance(inst: String): Properties = {
+ propertyCategories.get(inst) match {
+ case Some(s) => s
+ case None => propertyCategories(DEFAULT_PREFIX)
+ }
+ }
+}
+
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index bf4487e0fc..54f6c6e4da 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -13,34 +13,37 @@ import spark.metrics.source.Source
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
-
+
val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
val metricsConfig = new MetricsConfig(confFile)
-
+
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
-
+
+ val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
+
+ metricsConfig.initilize()
registerSources()
registerSinks()
-
+
def start() {
sinks.foreach(_.start)
}
-
+
def stop() {
sinks.foreach(_.stop)
}
-
+
def registerSource(source: Source) {
sources += source
registry.register(source.sourceName, source.metricRegistry)
}
-
+
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
- val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
-
+ val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
+
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
@@ -52,14 +55,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
}
-
+
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
- val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
-
+ val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
+
sinkConfigs.foreach { kv =>
- val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) {
- MetricsSystem.DEFAULT_SINKS(kv._1)
+ val classPath = if (DEFAULT_SINKS.contains(kv._1)) {
+ DEFAULT_SINKS(kv._1)
} else {
// For non-default sink, a property class should be set and create using reflection
kv._2.getProperty("class")
@@ -76,17 +79,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
private[spark] object MetricsSystem {
- val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
-
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
-
val timeUnits = Map(
- "millisecond" -> TimeUnit.MILLISECONDS,
+ "illisecond" -> TimeUnit.MILLISECONDS,
"second" -> TimeUnit.SECONDS,
"minute" -> TimeUnit.MINUTES,
"hour" -> TimeUnit.HOURS,
"day" -> TimeUnit.DAYS)
-
- def createMetricsSystem(instance: String) = new MetricsSystem(instance)
+
+ def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index d7b7a9e501..c67c0ee912 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -8,34 +8,34 @@ import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
- val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match {
+
+ val CONSOLE_DEFAULT_PERIOD = "10"
+ val CONSOLE_DEFAULT_UNIT = "second"
+
+ val CONSOLE_KEY_PERIOD = "period"
+ val CONSOLE_KEY_UNIT = "unit"
+
+ val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
- case None => ConsoleSink.CONSOLE_DEFAULT_PERIOD.toInt
+ case None => CONSOLE_DEFAULT_PERIOD.toInt
}
-
- val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match {
+
+ val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => MetricsSystem.timeUnits(s)
- case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
+ case None => MetricsSystem.timeUnits(CONSOLE_DEFAULT_UNIT)
}
-
+
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
- reporter.start(pollPeriod, pollUnit)
+ reporter.start(pollPeriod, pollUnit)
}
-
+
override def stop() {
reporter.stop()
}
}
-object ConsoleSink {
- val CONSOLE_DEFAULT_PERIOD = "10"
- val CONSOLE_DEFAULT_UNIT = "second"
-
- val CONSOLE_KEY_PERIOD = "period"
- val CONSOLE_KEY_UNIT = "unit"
-}
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index e6c5bffd3c..a8ca819e87 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -9,21 +9,29 @@ import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
- val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match {
+ val CSV_KEY_PERIOD = "period"
+ val CSV_KEY_UNIT = "unit"
+ val CSV_KEY_DIR = "directory"
+
+ val CSV_DEFAULT_PERIOD = "10"
+ val CSV_DEFAULT_UNIT = "second"
+ val CSV_DEFAULT_DIR = "/tmp/"
+
+ val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
- case None => CsvSink.CSV_DEFAULT_PERIOD.toInt
+ case None => CSV_DEFAULT_PERIOD.toInt
}
-
- val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match {
+
+ val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => MetricsSystem.timeUnits(s)
- case None => MetricsSystem.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
+ case None => MetricsSystem.timeUnits(CSV_DEFAULT_UNIT)
}
-
- val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match {
+
+ val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
- case None => CsvSink.CSV_DEFAULT_DIR
+ case None => CSV_DEFAULT_DIR
}
-
+
val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
@@ -31,21 +39,11 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
.build(new File(pollDir))
override def start() {
- reporter.start(pollPeriod, pollUnit)
+ reporter.start(pollPeriod, pollUnit)
}
-
+
override def stop() {
reporter.stop()
}
}
-object CsvSink {
- val CSV_KEY_PERIOD = "period"
- val CSV_KEY_UNIT = "unit"
- val CSV_KEY_DIR = "directory"
-
- val CSV_DEFAULT_PERIOD = "10"
- val CSV_DEFAULT_UNIT = "second"
- val CSV_DEFAULT_DIR = "/tmp/"
-}
-
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
index 57aa74512c..38158b8a2b 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -5,27 +5,26 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
- val metricRegistry = new MetricRegistry()
+ val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
-
- metricRegistry.register(MetricRegistry.name("stage","failedStage"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.failed.size
+ metricRegistry.register(MetricRegistry.name("stage", "failedStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
})
- metricRegistry.register(MetricRegistry.name("stage","runningStage"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.running.size
+ metricRegistry.register(MetricRegistry.name("stage", "runningStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
})
- metricRegistry.register(MetricRegistry.name("stage","waitingStage"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.waiting.size
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStage"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
})
- metricRegistry.register(MetricRegistry.name("job","allJobs"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.nextRunId.get()
+ metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextRunId.get()
})
- metricRegistry.register(MetricRegistry.name("job","ActiveJobs"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.activeJobs.size
+ metricRegistry.register(MetricRegistry.name("job", "ActiveJobs"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
})
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
index c0ce9259c8..f964827102 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -6,29 +6,29 @@ import spark.metrics.source.Source
import spark.storage._
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
- val metricRegistry = new MetricRegistry()
+ val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
- metricRegistry.register(MetricRegistry.name("memory","maxMem"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "maxMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+ val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
maxMem
}
})
- metricRegistry.register(MetricRegistry.name("memory","remainingMem"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("memory", "remainingMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
+ val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
remainingMem
}
})
- metricRegistry.register(MetricRegistry.name("disk","diskSpaceUsed"), new Gauge[Long] {
+ metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
- val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
+ val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
diskSpaceUsed
}
})
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index f4c83cb644..bb1be4f4fc 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -9,60 +9,64 @@ import spark.metrics._
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
-
+
before {
filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
}
test("MetricsConfig with default properties") {
val conf = new MetricsConfig("dummy-file")
+ conf.initilize()
+
assert(conf.properties.size() === 2)
assert(conf.properties.getProperty("*.sink.jmx.enabled") === "default")
assert(conf.properties.getProperty("*.source.jvm.class") === "spark.metrics.source.JvmSource")
assert(conf.properties.getProperty("test-for-dummy") === null)
-
+
val property = conf.getInstance("random")
assert(property.size() === 2)
assert(property.getProperty("sink.jmx.enabled") === "default")
assert(property.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
}
-
+
test("MetricsConfig with properties set") {
val conf = new MetricsConfig(filePath)
-
+ conf.initilize()
+
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 4)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minute")
assert(masterProp.getProperty("sink.jmx.enabled") === "default")
assert(masterProp.getProperty("source.jvm.class") == "spark.metrics.source.JvmSource")
-
+
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 4)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "second")
}
-
+
test("MetricsConfig with subProperties") {
val conf = new MetricsConfig(filePath)
-
+ conf.initilize()
+
val propCategories = conf.propertyCategories
assert(propCategories.size === 2)
-
+
val masterProp = conf.getInstance("master")
- val sourceProps = MetricsConfig.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
+ val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
assert(sourceProps.size === 1)
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
-
- val sinkProps = MetricsConfig.subProperties(masterProp, MetricsSystem.SINK_REGEX)
+
+ val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
assert(sinkProps.size === 2)
assert(sinkProps.contains("console"))
assert(sinkProps.contains("jmx"))
-
+
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
-
+
val jmxProps = sinkProps("jmx")
assert(jmxProps.size() === 1)
- }
+ }
}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index 967be6ec47..f29bb9db67 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -9,32 +9,32 @@ import spark.metrics._
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
-
+
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
System.setProperty("spark.metrics.conf.file", filePath)
}
-
+
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
-
+
assert(sources.length === 1)
assert(sinks.length === 1)
assert(sources(0).sourceName === "jvm")
}
-
+
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
-
+
assert(sources.length === 1)
assert(sinks.length === 2)
-
+
val source = new spark.deploy.master.MasterInstrumentation(null)
metricsSystem.registerSource(source)
assert(sources.length === 2)
- }
+ }
}