diff options
author | Neelesh Srinivas Salian <nsalian@cloudera.com> | 2015-07-13 15:46:51 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-07-13 15:46:51 -0700 |
commit | b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a (patch) | |
tree | 4a729a2e03af1f5ca1cb0d1b2dd997995cd87bf4 /streaming/src/main | |
parent | 0aed38e4498b24d372bfdc7001959e78536369a1 (diff) | |
download | spark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.tar.gz spark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.tar.bz2 spark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.zip |
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when StreamingContext is closed
The issue link: https://issues.apache.org/jira/browse/SPARK-8743
Deregister Codahale metrics for streaming when StreamingContext is closed
Design:
Adding the method calls in the appropriate start() and stop () methods for the StreamingContext
Actions in the PullRequest:
1) Added the registerSource method call to the start method for the Streaming Context.
2) Added the removeSource method to the stop method.
3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource
4) Added a test case to check for both registration and de-registration of metrics
Previous closed PR for reference: https://github.com/apache/spark/pull/7250
Author: Neelesh Srinivas Salian <nsalian@cloudera.com>
Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits:
7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call
8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import
0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place
daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports
8873180 [Neelesh Srinivas Salian] Removed redundancy in imports
59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports
d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d..6b78a82e68 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,8 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) private var state: StreamingContextState = INITIALIZED @@ -606,6 +603,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => @@ -682,6 +682,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() |