aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-06 21:08:36 -0700
committerAndrew Or <andrew@databricks.com>2015-06-06 21:08:36 -0700
commit18c4fcebbeecc3b26476a728bc9db62f5c0a6f87 (patch)
tree07fa3c61b2d304335a1d2d56dc5be78b426c2029 /core
parent5aa804f3c6485670937a658ce8207c2317c6a506 (diff)
downloadspark-18c4fcebbeecc3b26476a728bc9db62f5c0a6f87.tar.gz
spark-18c4fcebbeecc3b26476a728bc9db62f5c0a6f87.tar.bz2
spark-18c4fcebbeecc3b26476a728bc9db62f5c0a6f87.zip
[SPARK-7169] [CORE] Allow metrics system to be configured through SparkConf.
Author: Marcelo Vanzin <vanzin@cloudera.com> Author: Jacek Lewandowski <lewandowski.jacek@gmail.com> Closes #6560 from vanzin/SPARK-7169 and squashes the following commits: 737266f [Marcelo Vanzin] Feedback. 702d5a3 [Marcelo Vanzin] Scalastyle. ce66e7e [Marcelo Vanzin] Remove metrics config handling from SparkConf. 439938a [Jacek Lewandowski] SPARK-7169: Metrics can be additionally configured from Spark configuration
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala55
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala82
3 files changed, 115 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 8edf493780..d7495551ad 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -23,10 +23,10 @@ import java.util.Properties
import scala.collection.mutable
import scala.util.matching.Regex
-import org.apache.spark.Logging
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkConf}
-private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
@@ -46,23 +46,14 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
// Add default properties in case there's no properties file
setDefaultProperties(properties)
- // If spark.metrics.conf is not set, try to get file in class path
- val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
- try {
- Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
- } catch {
- case e: Exception =>
- logError("Error loading default configuration file", e)
- None
- }
- }
+ loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
- isOpt.foreach { is =>
- try {
- properties.load(is)
- } finally {
- is.close()
- }
+ // Also look for the properties in provided Spark configuration
+ val prefix = "spark.metrics.conf."
+ conf.getAll.foreach {
+ case (k, v) if k.startsWith(prefix) =>
+ properties.setProperty(k.substring(prefix.length()), v)
+ case _ =>
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
@@ -97,5 +88,31 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
-}
+ /**
+ * Loads configuration from a config file. If no config file is provided, try to get file
+ * in class path.
+ */
+ private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
+ var is: InputStream = null
+ try {
+ is = path match {
+ case Some(f) => new FileInputStream(f)
+ case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
+ }
+
+ if (is != null) {
+ properties.load(is)
+ }
+ } catch {
+ case e: Exception =>
+ val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
+ logError(s"Error loading configuration file $file", e)
+ } finally {
+ if (is != null) {
+ is.close()
+ }
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9150ad3571..ed5131c79f 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -70,8 +70,7 @@ private[spark] class MetricsSystem private (
securityMgr: SecurityManager)
extends Logging {
- private[this] val confFile = conf.get("spark.metrics.conf", null)
- private[this] val metricsConfig = new MetricsConfig(Option(confFile))
+ private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index a901a069d9..41f2ff725a 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.metrics
+import org.apache.spark.SparkConf
+
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
@@ -29,7 +31,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
}
test("MetricsConfig with default properties") {
- val conf = new MetricsConfig(None)
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", "dummy-file")
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
assert(conf.properties.size() === 4)
@@ -42,8 +46,41 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
- test("MetricsConfig with properties set") {
- val conf = new MetricsConfig(Option(filePath))
+ test("MetricsConfig with properties set from a file") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 5)
+ assert(masterProp.getProperty("sink.console.period") === "20")
+ assert(masterProp.getProperty("sink.console.unit") === "minutes")
+ assert(masterProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(masterProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 5)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(workerProp.getProperty("source.jvm.class") ===
+ "org.apache.spark.metrics.source.JvmSource")
+ assert(workerProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+ }
+
+ test("MetricsConfig with properties set from a Spark configuration") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+ setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+ setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
+ setMetricsProperty(sparkConf, "master.sink.console.period", "20")
+ setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
val masterProp = conf.getInstance("master")
@@ -67,8 +104,40 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
+ test("MetricsConfig with properties set from a file and a Spark configuration") {
+ val sparkConf = new SparkConf(loadDefaults = false)
+ setMetricsProperty(sparkConf, "*.sink.console.period", "10")
+ setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
+ setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.SomeOtherSource")
+ setMetricsProperty(sparkConf, "master.sink.console.period", "50")
+ setMetricsProperty(sparkConf, "master.sink.console.unit", "seconds")
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
+ conf.initialize()
+
+ val masterProp = conf.getInstance("master")
+ assert(masterProp.size() === 5)
+ assert(masterProp.getProperty("sink.console.period") === "50")
+ assert(masterProp.getProperty("sink.console.unit") === "seconds")
+ assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+ assert(masterProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
+
+ val workerProp = conf.getInstance("worker")
+ assert(workerProp.size() === 5)
+ assert(workerProp.getProperty("sink.console.period") === "10")
+ assert(workerProp.getProperty("sink.console.unit") === "seconds")
+ assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
+ assert(workerProp.getProperty("sink.servlet.class") ===
+ "org.apache.spark.metrics.sink.MetricsServlet")
+ assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
+ }
+
test("MetricsConfig with subProperties") {
- val conf = new MetricsConfig(Option(filePath))
+ val sparkConf = new SparkConf(loadDefaults = false)
+ sparkConf.set("spark.metrics.conf", filePath)
+ val conf = new MetricsConfig(sparkConf)
conf.initialize()
val propCategories = conf.propertyCategories
@@ -90,4 +159,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 2)
}
+
+ private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = {
+ conf.set(s"spark.metrics.conf.$name", value)
+ }
+
}