aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-06-28 14:48:21 +0800
committerjerryshao <saisai.shao@intel.com>2013-07-24 14:57:47 +0800
commit5ce5dc9fcd7acf5c58dd3d456a629b01d57514e4 (patch)
treeb07fcd17d5957ab6b87ee3c3f833b41de36ff2a0 /core
parent871bc1687eaeb59df24b4778c5992a5f7f105cc8 (diff)
downloadspark-5ce5dc9fcd7acf5c58dd3d456a629b01d57514e4.tar.gz
spark-5ce5dc9fcd7acf5c58dd3d456a629b01d57514e4.tar.bz2
spark-5ce5dc9fcd7acf5c58dd3d456a629b01d57514e4.zip
Add default properties to deal with no configure file situation
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala28
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala9
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala6
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala6
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala6
5 files changed, 30 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index be4f670918..7405192058 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -1,20 +1,25 @@
package spark.metrics
import java.util.Properties
-import java.io.FileInputStream
+import java.io.{File, FileInputStream}
import scala.collection.mutable
import scala.util.matching.Regex
private [spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
- var fis: FileInputStream = _
+ // Add default properties in case there's no properties file
+ MetricsConfig.setDefaultProperties(properties)
- try {
- fis = new FileInputStream(configFile)
- properties.load(fis)
- } finally {
- fis.close()
+ val confFile = new File(configFile)
+ if (confFile.exists()) {
+ var fis: FileInputStream = null
+ try {
+ fis = new FileInputStream(configFile)
+ properties.load(fis)
+ } finally {
+ fis.close()
+ }
}
val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
@@ -35,11 +40,15 @@ private [spark] class MetricsConfig(val configFile: String) {
}
}
-object MetricsConfig {
- val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
+private[spark] object MetricsConfig {
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+ def setDefaultProperties(prop: Properties) {
+ prop.setProperty("*.sink.jmx.enabled", "default")
+ prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
+ }
+
def subProperties(prop: Properties, regex: Regex) = {
val subProperties = new mutable.HashMap[String, Properties]
@@ -48,7 +57,6 @@ object MetricsConfig {
if (regex.findPrefixOf(kv._1) != None) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
- println(">>>>>subProperties added " + prefix + " " + suffix + " " + kv._2)
}
}
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 5bfdc00eaf..6e448cb2a5 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -14,7 +14,7 @@ import spark.metrics.source._
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
- val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
+ val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
@@ -58,9 +58,6 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
- // Register JMX sink as a default sink
- sinks += new JmxSink(registry)
-
// Register other sinks according to conf
sinkConfigs.foreach { kv =>
val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) {
@@ -81,9 +78,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
private[spark] object MetricsSystem {
- val DEFAULT_SINKS = Map(
- "console" -> "spark.metrics.sink.ConsoleSink",
- "csv" -> "spark.metrics.sink.CsvSink")
+ val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index e2e4197d1d..d7b7a9e501 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -1,10 +1,10 @@
package spark.metrics.sink
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
import java.util.Properties
import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
-
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
@@ -18,7 +18,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
}
- var reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
+ val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index c2d645331c..e6c5bffd3c 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -1,11 +1,11 @@
package spark.metrics.sink
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
-import com.codahale.metrics.{CsvReporter, MetricRegistry}
-
import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
@@ -24,7 +24,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
case None => CsvSink.CSV_DEFAULT_DIR
}
- var reporter: CsvReporter = CsvReporter.forRegistry(registry)
+ val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index 98b55f7b7f..f097a631c0 100644
--- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -1,9 +1,11 @@
package spark.metrics.sink
+import java.util.Properties
+
import com.codahale.metrics.{JmxReporter, MetricRegistry}
-class JmxSink(registry: MetricRegistry) extends Sink {
- var reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
+class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+ val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
reporter.start()