diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/metrics')
9 files changed, 560 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala new file mode 100644 index 0000000000..0f9c4e00b1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics + +import java.util.Properties +import java.io.{File, FileInputStream, InputStream, IOException} + +import scala.collection.mutable +import scala.util.matching.Regex + +import org.apache.spark.Logging + +private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging { + initLogging() + + val DEFAULT_PREFIX = "*" + val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r + val METRICS_CONF = "metrics.properties" + + val properties = new Properties() + var propertyCategories: mutable.HashMap[String, Properties] = null + + private def setDefaultProperties(prop: Properties) { + prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") + prop.setProperty("*.sink.servlet.uri", "/metrics/json") + prop.setProperty("*.sink.servlet.sample", "false") + prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") + } + + def initialize() { + //Add default properties in case there's no properties file + setDefaultProperties(properties) + + // If spark.metrics.conf is not set, try to get file in class path + var is: InputStream = null + try { + is = configFile match { + case Some(f) => new FileInputStream(f) + case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF) + } + + if (is != null) { + properties.load(is) + } + } catch { + case e: Exception => logError("Error loading configure file", e) + } finally { + if (is != null) is.close() + } + + propertyCategories = subProperties(properties, INSTANCE_REGEX) + if (propertyCategories.contains(DEFAULT_PREFIX)) { + import scala.collection.JavaConversions._ + + val defaultProperty = propertyCategories(DEFAULT_PREFIX) + for { (inst, prop) <- propertyCategories + if (inst != DEFAULT_PREFIX) + (k, v) <- defaultProperty + if (prop.getProperty(k) == null) } { + prop.setProperty(k, v) + } + } + } + + def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { + val subProperties = new mutable.HashMap[String, Properties] + import scala.collection.JavaConversions._ + prop.foreach { kv => + if (regex.findPrefixOf(kv._1) != None) { + val regex(prefix, suffix) = kv._1 + subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2) + } + } + subProperties + } + + def getInstance(inst: String): Properties = { + propertyCategories.get(inst) match { + case Some(s) => s + case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties) + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala new file mode 100644 index 0000000000..bec0c83be8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics + +import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.spark.Logging +import org.apache.spark.metrics.sink.{MetricsServlet, Sink} +import org.apache.spark.metrics.source.Source + +/** + * Spark Metrics System, created by specific "instance", combined by source, + * sink, periodically poll source metrics data to sink destinations. + * + * "instance" specify "who" (the role) use metrics system. In spark there are several roles + * like master, worker, executor, client driver, these roles will create metrics system + * for monitoring. So instance represents these roles. Currently in Spark, several instances + * have already implemented: master, worker, executor, driver, applications. + * + * "source" specify "where" (source) to collect metrics data. In metrics system, there exists + * two kinds of source: + * 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect + * Spark component's internal state, these sources are related to instance and will be + * added after specific metrics system is created. + * 2. Common source, like JvmSource, which will collect low level state, is configured by + * configuration and loaded through reflection. + * + * "sink" specify "where" (destination) to output metrics data to. Several sinks can be + * coexisted and flush metrics to all these sinks. + * + * Metrics configuration format is like below: + * [instance].[sink|source].[name].[options] = xxxx + * + * [instance] can be "master", "worker", "executor", "driver", "applications" which means only + * the specified instance has this property. + * wild card "*" can be used to replace instance name, which means all the instances will have + * this property. + * + * [sink|source] means this property belongs to source or sink. This field can only be source or sink. + * + * [name] specify the name of sink or source, it is custom defined. + * + * [options] is the specific property of this source or sink. + */ +private[spark] class MetricsSystem private (val instance: String) extends Logging { + initLogging() + + val confFile = System.getProperty("spark.metrics.conf") + val metricsConfig = new MetricsConfig(Option(confFile)) + + val sinks = new mutable.ArrayBuffer[Sink] + val sources = new mutable.ArrayBuffer[Source] + val registry = new MetricRegistry() + + // Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui + private var metricsServlet: Option[MetricsServlet] = None + + /** Get any UI handlers used by this metrics system. */ + def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array()) + + metricsConfig.initialize() + registerSources() + registerSinks() + + def start() { + sinks.foreach(_.start) + } + + def stop() { + sinks.foreach(_.stop) + } + + def registerSource(source: Source) { + sources += source + try { + registry.register(source.sourceName, source.metricRegistry) + } catch { + case e: IllegalArgumentException => logInfo("Metrics already registered", e) + } + } + + def removeSource(source: Source) { + sources -= source + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName) + }) + } + + def registerSources() { + val instConfig = metricsConfig.getInstance(instance) + val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) + + // Register all the sources related to instance + sourceConfigs.foreach { kv => + val classPath = kv._2.getProperty("class") + try { + val source = Class.forName(classPath).newInstance() + registerSource(source.asInstanceOf[Source]) + } catch { + case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e) + } + } + } + + def registerSinks() { + val instConfig = metricsConfig.getInstance(instance) + val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) + + sinkConfigs.foreach { kv => + val classPath = kv._2.getProperty("class") + try { + val sink = Class.forName(classPath) + .getConstructor(classOf[Properties], classOf[MetricRegistry]) + .newInstance(kv._2, registry) + if (kv._1 == "servlet") { + metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { + sinks += sink.asInstanceOf[Sink] + } + } catch { + case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e) + } + } + } +} + +private[spark] object MetricsSystem { + val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r + val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r + + val MINIMAL_POLL_UNIT = TimeUnit.SECONDS + val MINIMAL_POLL_PERIOD = 1 + + def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) { + val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit) + if (period < MINIMAL_POLL_PERIOD) { + throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit + + " below than minimal polling period ") + } + } + + def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance) +} diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala new file mode 100644 index 0000000000..bce257d6e6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.{ConsoleReporter, MetricRegistry} + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import org.apache.spark.metrics.MetricsSystem + +class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val CONSOLE_DEFAULT_PERIOD = 10 + val CONSOLE_DEFAULT_UNIT = "SECONDS" + + val CONSOLE_KEY_PERIOD = "period" + val CONSOLE_KEY_UNIT = "unit" + + val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match { + case Some(s) => s.toInt + case None => CONSOLE_DEFAULT_PERIOD + } + + val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT) + } + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build() + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala new file mode 100644 index 0000000000..3d1a06a395 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.{CsvReporter, MetricRegistry} + +import java.io.File +import java.util.{Locale, Properties} +import java.util.concurrent.TimeUnit + +import org.apache.spark.metrics.MetricsSystem + +class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val CSV_KEY_PERIOD = "period" + val CSV_KEY_UNIT = "unit" + val CSV_KEY_DIR = "directory" + + val CSV_DEFAULT_PERIOD = 10 + val CSV_DEFAULT_UNIT = "SECONDS" + val CSV_DEFAULT_DIR = "/tmp/" + + val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match { + case Some(s) => s.toInt + case None => CSV_DEFAULT_PERIOD + } + + val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT) + } + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match { + case Some(s) => s + case None => CSV_DEFAULT_DIR + } + + val reporter: CsvReporter = CsvReporter.forRegistry(registry) + .formatFor(Locale.US) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(new File(pollDir)) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala new file mode 100644 index 0000000000..621d086d41 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.{JmxReporter, MetricRegistry} + +import java.util.Properties + +class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val reporter: JmxReporter = JmxReporter.forRegistry(registry).build() + + override def start() { + reporter.start() + } + + override def stop() { + reporter.stop() + } + +} diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala new file mode 100644 index 0000000000..4e90dd4323 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.json.MetricsModule + +import com.fasterxml.jackson.databind.ObjectMapper + +import java.util.Properties +import java.util.concurrent.TimeUnit +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import org.apache.spark.ui.JettyUtils + +class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { + val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_SAMPLE = "sample" + + val servletURI = property.getProperty(SERVLET_KEY_URI) + + val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean + + val mapper = new ObjectMapper().registerModule( + new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) + + def getHandlers = Array[(String, Handler)]( + (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + ) + + def getMetricsSnapshot(request: HttpServletRequest): String = { + mapper.writeValueAsString(registry) + } + + override def start() { } + + override def stop() { } +} diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala new file mode 100644 index 0000000000..3a739aa563 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +trait Sink { + def start: Unit + def stop: Unit +} diff --git a/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala new file mode 100644 index 0000000000..75cb2b8973 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet} + +class JvmSource extends Source { + val sourceName = "jvm" + val metricRegistry = new MetricRegistry() + + val gcMetricSet = new GarbageCollectorMetricSet + val memGaugeSet = new MemoryUsageGaugeSet + + metricRegistry.registerAll(gcMetricSet) + metricRegistry.registerAll(memGaugeSet) +} diff --git a/core/src/main/scala/org/apache/spark/metrics/source/Source.scala b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala new file mode 100644 index 0000000000..3fee55cc6d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/source/Source.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry + +trait Source { + def sourceName: String + def metricRegistry: MetricRegistry +} |