aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMark Grover <mark@apache.org>2016-07-27 10:13:15 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-27 10:13:15 -0700
commit70f846a313061e4db6174e0dc6c12c8c806ccf78 (patch)
tree9b8d7aa2edbd6af395b3b7b212252126cc1bf0e6 /core
parent7e8279fde176b08687adf2b410693b35cfbd4b46 (diff)
downloadspark-70f846a313061e4db6174e0dc6c12c8c806ccf78.tar.gz
spark-70f846a313061e4db6174e0dc6c12c8c806ccf78.tar.bz2
spark-70f846a313061e4db6174e0dc6c12c8c806ccf78.zip
[SPARK-5847][CORE] Allow for configuring MetricsSystem's use of app ID to namespace all metrics
## What changes were proposed in this pull request? Adding a new property to SparkConf called spark.metrics.namespace that allows users to set a custom namespace for executor and driver metrics in the metrics systems. By default, the root namespace used for driver or executor metrics is the value of `spark.app.id`. However, often times, users want to be able to track the metrics across apps for driver and executor metrics, which is hard to do with application ID (i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases, users can set the `spark.metrics.namespace` property to another spark configuration key like `spark.app.name` which is then used to populate the root namespace of the metrics system (with the app name in our example). `spark.metrics.namespace` property can be set to any arbitrary spark property key, whose value would be used to set the root namespace of the metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the `spark.metrics.namespace` property have any such affect on such metrics. ## How was this patch tested? Added new unit tests, modified existing unit tests. Author: Mark Grover <mark@apache.org> Closes #14270 from markgrover/spark-5847.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala85
5 files changed, 158 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ebb21e9efd..cb75716d10 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -108,4 +108,9 @@ package object config {
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
+
+ // This property sets the root namespace for metrics reporting
+ private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 979782ea40..a4056508c1 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -35,7 +35,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
private[metrics] val properties = new Properties()
- private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null
+ private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
@@ -44,6 +44,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
+ /**
+ * Load properties from various places, based on precedence
+ * If the same property is set again latter on in the method, it overwrites the previous value
+ */
def initialize() {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
@@ -58,16 +62,47 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
case _ =>
}
- propertyCategories = subProperties(properties, INSTANCE_REGEX)
- if (propertyCategories.contains(DEFAULT_PREFIX)) {
- val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala
- for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX);
- (k, v) <- defaultProperty if (prop.get(k) == null)) {
+ // Now, let's populate a list of sub-properties per instance, instance being the prefix that
+ // appears before the first dot in the property name.
+ // Add to the sub-properties per instance, the default properties (those with prefix "*"), if
+ // they don't have that exact same sub-property already defined.
+ //
+ // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path,
+ // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
+ // ("driver"->Map("path"->"driver_path", "class"->"default_class")
+ // Note how class got added to based on the default property, but path remained the same
+ // since "driver.path" already existed and took precedence over "*.path"
+ //
+ perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
+ if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
+ val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
+ for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
+ (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
+ /**
+ * Take a simple set of properties and a regex that the instance names (part before the first dot)
+ * have to conform to. And, return a map of the first order prefix (before the first dot) to the
+ * sub-properties under that prefix.
+ *
+ * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
+ * "*.sink.servlet.path"->"path1"), the returned map would be
+ * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
+ * Note in the subProperties (value of the returned Map), only the suffixes are used as property
+ * keys.
+ * If, in the passed properties, there is only one property with a given prefix, it is still
+ * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
+ * the returned Map would contain one key-value pair
+ * Map("*" -> Properties("sink.servlet.class" -> "class1"))
+ * Any passed in properties, not complying with the regex are ignored.
+ *
+ * @param prop the flat list of properties to "unflatten" based on prefixes
+ * @param regex the regex that the prefix has to comply with
+ * @return an unflatted map, mapping prefix with sub-properties under that prefix
+ */
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
prop.asScala.foreach { kv =>
@@ -80,9 +115,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
}
def getInstance(inst: String): Properties = {
- propertyCategories.get(inst) match {
+ perInstanceSubProperties.get(inst) match {
case Some(s) => s
- case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+ case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9b16c116ae..1d494500cd 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -26,30 +26,31 @@ import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.{Source, StaticSources}
import org.apache.spark.util.Utils
/**
- * Spark Metrics System, created by specific "instance", combined by source,
- * sink, periodically poll source metrics data to sink destinations.
+ * Spark Metrics System, created by a specific "instance", combined by source,
+ * sink, periodically polls source metrics data to sink destinations.
*
- * "instance" specify "who" (the role) use metrics system. In spark there are several roles
- * like master, worker, executor, client driver, these roles will create metrics system
- * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * "instance" specifies "who" (the role) uses the metrics system. In Spark, there are several roles
+ * like master, worker, executor, client driver. These roles will create metrics system
+ * for monitoring. So, "instance" represents these roles. Currently in Spark, several instances
* have already implemented: master, worker, executor, driver, applications.
*
- * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * "source" specifies "where" (source) to collect metrics data from. In metrics system, there exists
* two kinds of source:
* 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
* Spark component's internal state, these sources are related to instance and will be
- * added after specific metrics system is created.
+ * added after a specific metrics system is created.
* 2. Common source, like JvmSource, which will collect low level state, is configured by
* configuration and loaded through reflection.
*
- * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
- * coexisted and flush metrics to all these sinks.
+ * "sink" specifies "where" (destination) to output metrics data to. Several sinks can
+ * coexist and metrics can be flushed to all these sinks.
*
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
@@ -62,9 +63,9 @@ import org.apache.spark.util.Utils
* [sink|source] means this property belongs to source or sink. This field can only be
* source or sink.
*
- * [name] specify the name of sink or source, it is custom defined.
+ * [name] specify the name of sink or source, if it is custom defined.
*
- * [options] is the specific property of this source or sink.
+ * [options] represent the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (
val instance: String,
@@ -125,19 +126,25 @@ private[spark] class MetricsSystem private (
* application, executor/driver and metric source.
*/
private[spark] def buildRegistryName(source: Source): String = {
- val appId = conf.getOption("spark.app.id")
+ val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
+
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
- if (appId.isDefined && executorId.isDefined) {
- MetricRegistry.name(appId.get, executorId.get, source.sourceName)
+ if (metricsNamespace.isDefined && executorId.isDefined) {
+ MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
- val warningMsg = s"Using default name $defaultName for source because %s is not set."
- if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
- if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
+ if (metricsNamespace.isEmpty) {
+ logWarning(s"Using default name $defaultName for source because neither " +
+ s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
+ }
+ if (executorId.isEmpty) {
+ logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
+ s"not set.")
+ }
defaultName
}
} else { defaultName }
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index b24f5d732f..a85011b42b 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -139,7 +139,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new MetricsConfig(sparkConf)
conf.initialize()
- val propCategories = conf.propertyCategories
+ val propCategories = conf.perInstanceSubProperties
assert(propCategories.size === 3)
val masterProp = conf.getInstance("master")
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 2400832f6e..61db6af830 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.{Source, StaticSources}
class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester{
@@ -183,4 +184,88 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
assert(metricName != s"$appId.$executorId.${source.sourceName}")
assert(metricName === source.sourceName)
}
+
+ test("MetricsSystem with Executor instance, with custom namespace") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val appName = "testName"
+ val executorId = "1"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.app.name", appName)
+ conf.set("spark.executor.id", executorId)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appName.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance, custom namespace which is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "1"
+ val namespaceToResolve = "${spark.doesnotexist}"
+ conf.set("spark.executor.id", executorId)
+ conf.set(METRICS_NAMESPACE, namespaceToResolve)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ // If the user set the spark.metrics.namespace property to an expansion of another property
+ // (say ${spark.doesnotexist}, the unresolved name (i.e. literally ${spark.doesnotexist})
+ // is used as the root logger name.
+ assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance, custom namespace, spark.executor.id not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.name", appId)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with non-driver, non-executor instance with custom namespace") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val appName = "testName"
+ val executorId = "dummyExecutorId"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.app.name", appName)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "testInstance"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+
+ // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name.
+ assert(metricName != s"$appId.$executorId.${source.sourceName}")
+ assert(metricName === source.sourceName)
+ }
+
}