aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-27 09:47:27 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:46 +0800
commit503acd3a379a3686d343fdf072fc231b8fba78f9 (patch)
tree90c62b415e8d28ea287efb1852b3a5d469017444 /core
parentb0113290404205fad3d227923cc83dbefdd00202 (diff)
downloadspark-503acd3a379a3686d343fdf072fc231b8fba78f9.tar.gz
spark-503acd3a379a3686d343fdf072fc231b8fba78f9.tar.bz2
spark-503acd3a379a3686d343fdf072fc231b8fba78f9.zip
Build metrics system framwork
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/metrics/AbstractInstrumentation.scala68
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala55
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala43
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala53
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala17
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala6
6 files changed, 242 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
new file mode 100644
index 0000000000..0fed608488
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
@@ -0,0 +1,68 @@
+package spark.metrics
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import spark.Logging
+import spark.metrics.sink._
+
+trait AbstractInstrumentation extends Logging {
+ initLogging()
+
+ def registryHandler: MetricRegistry
+ def instance: String
+
+ val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
+ val metricsConfig = new MetricsConfig(confFile)
+
+ val sinks = new mutable.ArrayBuffer[Sink]
+
+ def registerSinks() {
+ val instConfig = metricsConfig.getInstance(instance)
+ val sinkConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SINK_REGEX)
+
+ // Register JMX sink as a default sink
+ sinks += new JmxSink(registryHandler)
+
+ // Register other sinks according to conf
+ sinkConfigs.foreach { kv =>
+ val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
+ AbstractInstrumentation.DEFAULT_SINKS(kv._1)
+ } else {
+ kv._2.getProperty("class")
+ }
+ try {
+ val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
+ .newInstance(kv._2, registryHandler)
+ sinks += sink.asInstanceOf[Sink]
+ } catch {
+ case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
+ }
+ }
+
+ sinks.foreach(_.registerSink)
+ }
+
+ def unregisterSinks() {
+ sinks.foreach(_.unregisterSink)
+ }
+}
+
+object AbstractInstrumentation {
+ val DEFAULT_SINKS = Map(
+ "console" -> "spark.metrics.sink.ConsoleSink",
+ "csv" -> "spark.metrics.sink.CsvSink")
+
+ val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+
+ val timeUnits = Map(
+ "millisecond" -> TimeUnit.MILLISECONDS,
+ "second" -> TimeUnit.SECONDS,
+ "minute" -> TimeUnit.MINUTES,
+ "hour" -> TimeUnit.HOURS,
+ "day" -> TimeUnit.DAYS)
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
new file mode 100644
index 0000000000..0fec1988ea
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -0,0 +1,55 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.FileInputStream
+
+import scala.collection.mutable
+import scala.util.matching.Regex
+
+class MetricsConfig(val configFile: String) {
+ val properties = new Properties()
+ var fis: FileInputStream = _
+
+ try {
+ fis = new FileInputStream(configFile)
+ properties.load(fis)
+ } finally {
+ fis.close()
+ }
+
+ val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
+ if (propertyCategories.contains(MetricsConfig.DEFAULT_PREFIX)) {
+ import scala.collection.JavaConversions._
+ val defaultProperty = propertyCategories(MetricsConfig.DEFAULT_PREFIX)
+ for ((inst, prop) <- propertyCategories; p <- defaultProperty
+ if inst != MetricsConfig.DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
+ prop.setProperty(p._1, p._2)
+ }
+ }
+
+ def getInstance(inst: String) = {
+ propertyCategories.get(inst) match {
+ case Some(s) => s
+ case None => propertyCategories(MetricsConfig.DEFAULT_PREFIX)
+ }
+ }
+}
+
+object MetricsConfig {
+ val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
+ val DEFAULT_PREFIX = "*"
+ val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+
+ def subProperties(prop: Properties, regex: Regex) = {
+ val subProperties = new mutable.HashMap[String, Properties]
+
+ import scala.collection.JavaConversions._
+ prop.foreach { kv =>
+ val regex(a, b) = kv._1
+ subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
+ println(">>>>>subProperties added " + a + " " + b + " " + kv._2)
+ }
+
+ subProperties
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
new file mode 100644
index 0000000000..5426af8c4c
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -0,0 +1,43 @@
+package spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
+import spark.metrics.AbstractInstrumentation
+
+class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => ConsoleSink.CONSOLE_DEFAULT_PERIOD.toInt
+ }
+
+ val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match {
+ case Some(s) => AbstractInstrumentation.timeUnits(s)
+ case None => AbstractInstrumentation.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
+ }
+
+ var reporter: ConsoleReporter = _
+
+ override def registerSink() {
+ reporter = ConsoleReporter.forRegistry(registry)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build()
+
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def unregisterSink() {
+ reporter.stop()
+ }
+}
+
+object ConsoleSink {
+ val CONSOLE_DEFAULT_PERIOD = "10"
+ val CONSOLE_DEFAULT_UNIT = "second"
+
+ val CONSOLE_KEY_PERIOD = "period"
+ val CONSOLE_KEY_UNIT = "unit"
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
new file mode 100644
index 0000000000..3a80c36901
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -0,0 +1,53 @@
+package spark.metrics.sink
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
+import spark.metrics.AbstractInstrumentation
+
+class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match {
+ case Some(s) => s.toInt
+ case None => CsvSink.CSV_DEFAULT_PERIOD.toInt
+ }
+
+ val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match {
+ case Some(s) => AbstractInstrumentation.timeUnits(s)
+ case None => AbstractInstrumentation.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
+ }
+
+ val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match {
+ case Some(s) => s
+ case None => CsvSink.CSV_DEFAULT_DIR
+ }
+
+ var reporter: CsvReporter = _
+
+ override def registerSink() {
+ reporter = CsvReporter.forRegistry(registry)
+ .formatFor(Locale.US)
+ .convertDurationsTo(TimeUnit.MILLISECONDS)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .build(new File(pollDir))
+
+ reporter.start(pollPeriod, pollUnit)
+ }
+
+ override def unregisterSink() {
+ reporter.stop()
+ }
+}
+
+object CsvSink {
+ val CSV_KEY_PERIOD = "period"
+ val CSV_KEY_UNIT = "unit"
+ val CSV_KEY_DIR = "directory"
+
+ val CSV_DEFAULT_PERIOD = "1"
+ val CSV_DEFAULT_UNIT = "minute"
+ val CSV_DEFAULT_DIR = "/tmp/"
+}
+
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
new file mode 100644
index 0000000000..56e5677700
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -0,0 +1,17 @@
+package spark.metrics.sink
+
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
+
+class JmxSink(registry: MetricRegistry) extends Sink {
+ var reporter: JmxReporter = _
+
+ override def registerSink() {
+ reporter = JmxReporter.forRegistry(registry).build()
+ reporter.start()
+ }
+
+ override def unregisterSink() {
+ reporter.stop()
+ }
+
+} \ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
new file mode 100644
index 0000000000..65ebcb4eac
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -0,0 +1,6 @@
+package spark.metrics.sink
+
+trait Sink {
+ def registerSink: Unit
+ def unregisterSink: Unit
+} \ No newline at end of file