aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala2
-rwxr-xr-xcore/src/main/scala/org/apache/spark/deploy/worker/Worker.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala1
-rw-r--r--extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala4
13 files changed, 34 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b25f081761..f5a0549834 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -990,6 +990,7 @@ class SparkContext(config: SparkConf) extends Logging {
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
+ env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 21f8667819..a70ecdb375 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -154,6 +154,8 @@ private[spark] class Master(
}
override def postStop() {
+ masterMetricsSystem.report()
+ applicationMetricsSystem.report()
// prevent the CompleteRecovery message sending to restarted master
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index ce42544305..fb5252da96 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -357,6 +357,7 @@ private[spark] class Worker(
}
override def postStop() {
+ metricsSystem.report()
registrationRetryTimer.foreach(_.cancel())
executors.values.foreach(_.kill())
drivers.values.foreach(_.kill())
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 860b47e056..af736de405 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -88,6 +88,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case StopExecutor =>
logInfo("Driver commanded a shutdown")
+ executor.stop()
context.stop(self)
context.system.shutdown()
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3b69bc4ca4..99d650a363 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -121,6 +121,10 @@ private[spark] class Executor(
}
}
+ def stop(): Unit = {
+ env.metricsSystem.report()
+ }
+
/** Get the Yarn approved local directories. */
private def getYarnLocalDirs(): String = {
// Hadoop 0.23 and 2.x have different Environment variable names for the
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 651511da1b..6ef817d0e5 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -91,6 +91,10 @@ private[spark] class MetricsSystem private (val instance: String,
sinks.foreach(_.stop)
}
+ def report(): Unit = {
+ sinks.foreach(_.report())
+ }
+
def registerSource(source: Source) {
sources += source
try {
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
index 05852f1f98..81b9056b40 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala
@@ -57,5 +57,9 @@ private[spark] class ConsoleSink(val property: Properties, val registry: MetricR
override def stop() {
reporter.stop()
}
+
+ override def report() {
+ reporter.report()
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
index 542dce6536..9d5f2ae932 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/CsvSink.scala
@@ -66,5 +66,9 @@ private[spark] class CsvSink(val property: Properties, val registry: MetricRegis
override def stop() {
reporter.stop()
}
+
+ override def report() {
+ reporter.report()
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index aeb4ad44a0..d7b5f5c40e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -81,4 +81,8 @@ private[spark] class GraphiteSink(val property: Properties, val registry: Metric
override def stop() {
reporter.stop()
}
+
+ override def report() {
+ reporter.report()
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
index ed27234b4e..2588fe2c9e 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/JmxSink.scala
@@ -35,4 +35,6 @@ private[spark] class JmxSink(val property: Properties, val registry: MetricRegis
reporter.stop()
}
+ override def report() { }
+
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 571539ba5e..2f65bc8b46 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -57,4 +57,6 @@ private[spark] class MetricsServlet(val property: Properties, val registry: Metr
override def start() { }
override def stop() { }
+
+ override def report() { }
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
index 6f2b5a0602..0d83d8c425 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
@@ -20,4 +20,5 @@ package org.apache.spark.metrics.sink
private[spark] trait Sink {
def start: Unit
def stop: Unit
+ def report(): Unit
}
diff --git a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
index d03d7774e8..3b1880e143 100644
--- a/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
+++ b/extras/spark-ganglia-lgpl/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala
@@ -82,5 +82,9 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
override def stop() {
reporter.stop()
}
+
+ override def report() {
+ reporter.report()
+ }
}