aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-27 12:00:19 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commitc3daad3f65630eb4ed536d06c0d467cde57a8142 (patch)
treefa26bf7f013f1c80731ec4fe58f9c243f6ddc1f2 /core
parent9dec8c73e6f0c3b6b55a11ff92cc9bff18dadd24 (diff)
downloadspark-c3daad3f65630eb4ed536d06c0d467cde57a8142.tar.gz
spark-c3daad3f65630eb4ed536d06c0d467cde57a8142.tar.bz2
spark-c3daad3f65630eb4ed536d06c0d467cde57a8142.zip
Update metric source support for instrumentation
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala5
-rw-r--r--core/src/main/scala/spark/metrics/AbstractInstrumentation.scala29
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala12
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala4
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala1
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala17
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala5
8 files changed, 67 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 13088189a4..c295e725d7 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -15,12 +15,15 @@ private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
def initialize(master: Master) {
masterInst = Some(master)
+ // Register all the sources
+ registerSources()
+
// Register and start all the sinks
- registerSinks
+ registerSinks()
}
def uninitialize() {
- unregisterSinks
+ unregisterSinks()
}
// Gauge for worker numbers in cluster
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 04c43ce33b..2f725300b5 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -15,6 +15,9 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
def initialize(worker: Worker) {
workerInst = Some(worker)
+ // Register all the sources
+ registerSources()
+
// Register and start all the sinks
registerSinks()
}
@@ -36,7 +39,7 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
})
// Gauge for memory used of this worker
- metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"),
+ metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
new Gauge[Int] {
override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
})
diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
index 0fed608488..9cae1e0220 100644
--- a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
+++ b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
@@ -9,17 +9,39 @@ import java.util.concurrent.TimeUnit
import spark.Logging
import spark.metrics.sink._
+import spark.metrics.source._
-trait AbstractInstrumentation extends Logging {
+private [spark] trait AbstractInstrumentation extends Logging {
initLogging()
+ // Get MetricRegistry handler
def registryHandler: MetricRegistry
+ // Get the instance name
def instance: String
val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
+ val sources = new mutable.ArrayBuffer[Source]
+
+ def registerSources() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX)
+
+ // Register all the sources
+ sourceConfigs.foreach { kv =>
+ val classPath = kv._2.getProperty("class")
+ try {
+ val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry])
+ .newInstance(registryHandler)
+ sources += source.asInstanceOf[Source]
+ } catch {
+ case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
+ }
+ }
+ sources.foreach(_.registerSource)
+ }
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
@@ -33,6 +55,7 @@ trait AbstractInstrumentation extends Logging {
val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
AbstractInstrumentation.DEFAULT_SINKS(kv._1)
} else {
+ // For non-default sink, a property class should be set and create using reflection
kv._2.getProperty("class")
}
try {
@@ -40,10 +63,9 @@ trait AbstractInstrumentation extends Logging {
.newInstance(kv._2, registryHandler)
sinks += sink.asInstanceOf[Sink]
} catch {
- case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
+ case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
}
}
-
sinks.foreach(_.registerSink)
}
@@ -58,6 +80,7 @@ object AbstractInstrumentation {
"csv" -> "spark.metrics.sink.CsvSink")
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+ val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
val timeUnits = Map(
"millisecond" -> TimeUnit.MILLISECONDS,
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 0fec1988ea..be4f670918 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -6,7 +6,7 @@ import java.io.FileInputStream
import scala.collection.mutable
import scala.util.matching.Regex
-class MetricsConfig(val configFile: String) {
+private [spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
var fis: FileInputStream = _
@@ -36,7 +36,7 @@ class MetricsConfig(val configFile: String) {
}
object MetricsConfig {
- val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
+ val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
@@ -45,9 +45,11 @@ object MetricsConfig {
import scala.collection.JavaConversions._
prop.foreach { kv =>
- val regex(a, b) = kv._1
- subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
- println(">>>>>subProperties added " + a + " " + b + " " + kv._2)
+ if (regex.findPrefixOf(kv._1) != None) {
+ val regex(prefix, suffix) = kv._1
+ subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+ println(">>>>>subProperties added " + prefix + " " + suffix + " " + kv._2)
+ }
}
subProperties
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 3a80c36901..1d663f6cff 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -46,8 +46,8 @@ object CsvSink {
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
- val CSV_DEFAULT_PERIOD = "1"
- val CSV_DEFAULT_UNIT = "minute"
+ val CSV_DEFAULT_PERIOD = "10"
+ val CSV_DEFAULT_UNIT = "second"
val CSV_DEFAULT_DIR = "/tmp/"
}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 65ebcb4eac..26052b7231 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -2,5 +2,6 @@ package spark.metrics.sink
trait Sink {
def registerSink: Unit
+
def unregisterSink: Unit
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000..8f6bf48843
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,17 @@
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+
+class JvmSource(registry: MetricRegistry) extends Source {
+ // Initialize memory usage gauge for jvm
+ val memUsageMetricSet = new MemoryUsageGaugeSet
+
+ // Initialize garbage collection usage gauge for jvm
+ val gcMetricSet = new GarbageCollectorMetricSet
+
+ override def registerSource() {
+ registry.registerAll(memUsageMetricSet)
+ registry.registerAll(gcMetricSet)
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000..35cfe0c8ff
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,5 @@
+package spark.metrics.source
+
+trait Source {
+ def registerSource: Unit
+} \ No newline at end of file