aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala41
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala85
-rw-r--r--docs/monitoring.md12
6 files changed, 170 insertions, 26 deletions
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ebb21e9efd..cb75716d10 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -108,4 +108,9 @@ package object config {
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
+
+ // This property sets the root namespace for metrics reporting
+ private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 979782ea40..a4056508c1 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -35,7 +35,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_METRICS_CONF_FILENAME = "metrics.properties"
private[metrics] val properties = new Properties()
- private[metrics] var propertyCategories: mutable.HashMap[String, Properties] = null
+ private[metrics] var perInstanceSubProperties: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
@@ -44,6 +44,10 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
+ /**
+ * Load properties from various places, based on precedence
+ * If the same property is set again latter on in the method, it overwrites the previous value
+ */
def initialize() {
// Add default properties in case there's no properties file
setDefaultProperties(properties)
@@ -58,16 +62,47 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
case _ =>
}
- propertyCategories = subProperties(properties, INSTANCE_REGEX)
- if (propertyCategories.contains(DEFAULT_PREFIX)) {
- val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala
- for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX);
- (k, v) <- defaultProperty if (prop.get(k) == null)) {
+ // Now, let's populate a list of sub-properties per instance, instance being the prefix that
+ // appears before the first dot in the property name.
+ // Add to the sub-properties per instance, the default properties (those with prefix "*"), if
+ // they don't have that exact same sub-property already defined.
+ //
+ // For example, if properties has ("*.class"->"default_class", "*.path"->"default_path,
+ // "driver.path"->"driver_path"), for driver specific sub-properties, we'd like the output to be
+ // ("driver"->Map("path"->"driver_path", "class"->"default_class")
+ // Note how class got added to based on the default property, but path remained the same
+ // since "driver.path" already existed and took precedence over "*.path"
+ //
+ perInstanceSubProperties = subProperties(properties, INSTANCE_REGEX)
+ if (perInstanceSubProperties.contains(DEFAULT_PREFIX)) {
+ val defaultSubProperties = perInstanceSubProperties(DEFAULT_PREFIX).asScala
+ for ((instance, prop) <- perInstanceSubProperties if (instance != DEFAULT_PREFIX);
+ (k, v) <- defaultSubProperties if (prop.get(k) == null)) {
prop.put(k, v)
}
}
}
+ /**
+ * Take a simple set of properties and a regex that the instance names (part before the first dot)
+ * have to conform to. And, return a map of the first order prefix (before the first dot) to the
+ * sub-properties under that prefix.
+ *
+ * For example, if the properties sent were Properties("*.sink.servlet.class"->"class1",
+ * "*.sink.servlet.path"->"path1"), the returned map would be
+ * Map("*" -> Properties("sink.servlet.class" -> "class1", "sink.servlet.path" -> "path1"))
+ * Note in the subProperties (value of the returned Map), only the suffixes are used as property
+ * keys.
+ * If, in the passed properties, there is only one property with a given prefix, it is still
+ * "unflattened". For example, if the input was Properties("*.sink.servlet.class" -> "class1"
+ * the returned Map would contain one key-value pair
+ * Map("*" -> Properties("sink.servlet.class" -> "class1"))
+ * Any passed in properties, not complying with the regex are ignored.
+ *
+ * @param prop the flat list of properties to "unflatten" based on prefixes
+ * @param regex the regex that the prefix has to comply with
+ * @return an unflatted map, mapping prefix with sub-properties under that prefix
+ */
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
prop.asScala.foreach { kv =>
@@ -80,9 +115,9 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
}
def getInstance(inst: String): Properties = {
- propertyCategories.get(inst) match {
+ perInstanceSubProperties.get(inst) match {
case Some(s) => s
- case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+ case None => perInstanceSubProperties.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 9b16c116ae..1d494500cd 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -26,30 +26,31 @@ import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.{Source, StaticSources}
import org.apache.spark.util.Utils
/**
- * Spark Metrics System, created by specific "instance", combined by source,
- * sink, periodically poll source metrics data to sink destinations.
+ * Spark Metrics System, created by a specific "instance", combined by source,
+ * sink, periodically polls source metrics data to sink destinations.
*
- * "instance" specify "who" (the role) use metrics system. In spark there are several roles
- * like master, worker, executor, client driver, these roles will create metrics system
- * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * "instance" specifies "who" (the role) uses the metrics system. In Spark, there are several roles
+ * like master, worker, executor, client driver. These roles will create metrics system
+ * for monitoring. So, "instance" represents these roles. Currently in Spark, several instances
* have already implemented: master, worker, executor, driver, applications.
*
- * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * "source" specifies "where" (source) to collect metrics data from. In metrics system, there exists
* two kinds of source:
* 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
* Spark component's internal state, these sources are related to instance and will be
- * added after specific metrics system is created.
+ * added after a specific metrics system is created.
* 2. Common source, like JvmSource, which will collect low level state, is configured by
* configuration and loaded through reflection.
*
- * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
- * coexisted and flush metrics to all these sinks.
+ * "sink" specifies "where" (destination) to output metrics data to. Several sinks can
+ * coexist and metrics can be flushed to all these sinks.
*
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
@@ -62,9 +63,9 @@ import org.apache.spark.util.Utils
* [sink|source] means this property belongs to source or sink. This field can only be
* source or sink.
*
- * [name] specify the name of sink or source, it is custom defined.
+ * [name] specify the name of sink or source, if it is custom defined.
*
- * [options] is the specific property of this source or sink.
+ * [options] represent the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (
val instance: String,
@@ -125,19 +126,25 @@ private[spark] class MetricsSystem private (
* application, executor/driver and metric source.
*/
private[spark] def buildRegistryName(source: Source): String = {
- val appId = conf.getOption("spark.app.id")
+ val metricsNamespace = conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
+
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
if (instance == "driver" || instance == "executor") {
- if (appId.isDefined && executorId.isDefined) {
- MetricRegistry.name(appId.get, executorId.get, source.sourceName)
+ if (metricsNamespace.isDefined && executorId.isDefined) {
+ MetricRegistry.name(metricsNamespace.get, executorId.get, source.sourceName)
} else {
// Only Driver and Executor set spark.app.id and spark.executor.id.
// Other instance types, e.g. Master and Worker, are not related to a specific application.
- val warningMsg = s"Using default name $defaultName for source because %s is not set."
- if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
- if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
+ if (metricsNamespace.isEmpty) {
+ logWarning(s"Using default name $defaultName for source because neither " +
+ s"${METRICS_NAMESPACE.key} nor spark.app.id is set.")
+ }
+ if (executorId.isEmpty) {
+ logWarning(s"Using default name $defaultName for source because spark.executor.id is " +
+ s"not set.")
+ }
defaultName
}
} else { defaultName }
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
index b24f5d732f..a85011b42b 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala
@@ -139,7 +139,7 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new MetricsConfig(sparkConf)
conf.initialize()
- val propCategories = conf.propertyCategories
+ val propCategories = conf.perInstanceSubProperties
assert(propCategories.size === 3)
val masterProp = conf.getInstance("master")
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index 2400832f6e..61db6af830 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.{Source, StaticSources}
class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateMethodTester{
@@ -183,4 +184,88 @@ class MetricsSystemSuite extends SparkFunSuite with BeforeAndAfter with PrivateM
assert(metricName != s"$appId.$executorId.${source.sourceName}")
assert(metricName === source.sourceName)
}
+
+ test("MetricsSystem with Executor instance, with custom namespace") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val appName = "testName"
+ val executorId = "1"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.app.name", appName)
+ conf.set("spark.executor.id", executorId)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appName.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance, custom namespace which is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "1"
+ val namespaceToResolve = "${spark.doesnotexist}"
+ conf.set("spark.executor.id", executorId)
+ conf.set(METRICS_NAMESPACE, namespaceToResolve)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ // If the user set the spark.metrics.namespace property to an expansion of another property
+ // (say ${spark.doesnotexist}, the unresolved name (i.e. literally ${spark.doesnotexist})
+ // is used as the root logger name.
+ assert(metricName === s"$namespaceToResolve.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance, custom namespace, spark.executor.id not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.name", appId)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with non-driver, non-executor instance with custom namespace") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val appName = "testName"
+ val executorId = "dummyExecutorId"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.app.name", appName)
+ conf.set(METRICS_NAMESPACE, "${spark.app.name}")
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "testInstance"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+
+ // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name.
+ assert(metricName != s"$appId.$executorId.${source.sourceName}")
+ assert(metricName === source.sourceName)
+ }
+
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index c8694762ff..6fdf87b4be 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -346,6 +346,18 @@ This allows users to report Spark metrics to a variety of sinks including HTTP,
files. The metrics system is configured via a configuration file that Spark expects to be present
at `$SPARK_HOME/conf/metrics.properties`. A custom file location can be specified via the
`spark.metrics.conf` [configuration property](configuration.html#spark-properties).
+By default, the root namespace used for driver or executor metrics is
+the value of `spark.app.id`. However, often times, users want to be able to track the metrics
+across apps for driver and executors, which is hard to do with application ID
+(i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases,
+a custom namespace can be specified for metrics reporting using `spark.metrics.namespace`
+configuration property.
+If, say, users wanted to set the metrics namespace to the name of the application, they
+can set the `spark.metrics.namespace` property to a value like `${spark.app.name}`. This value is
+then expanded appropriately by Spark and is used as the root namespace of the metrics system.
+Non driver and executor metrics are never prefixed with `spark.app.id`, nor does the
+`spark.metrics.namespace` property have any such affect on such metrics.
+
Spark's metrics are decoupled into different
_instances_ corresponding to Spark components. Within each instance, you can configure a
set of sinks to which metrics are reported. The following instances are currently supported: