aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorNeelesh Srinivas Salian <nsalian@cloudera.com>2015-07-13 15:46:51 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-13 15:46:51 -0700
commitb7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a (patch)
tree4a729a2e03af1f5ca1cb0d1b2dd997995cd87bf4 /streaming
parent0aed38e4498b24d372bfdc7001959e78536369a1 (diff)
downloadspark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.tar.gz
spark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.tar.bz2
spark-b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a.zip
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when StreamingContext is closed
The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: https://github.com/apache/spark/pull/7250 Author: Neelesh Srinivas Salian <nsalian@cloudera.com> Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala41
2 files changed, 45 insertions, 6 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ec49d0f42d..6b78a82e68 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -192,11 +192,8 @@ class StreamingContext private[streaming] (
None
}
- /** Register streaming source to metrics system */
+ /* Initializing a streamingSource to register metrics */
private val streamingSource = new StreamingSource(this)
- assert(env != null)
- assert(env.metricsSystem != null)
- env.metricsSystem.registerSource(streamingSource)
private var state: StreamingContextState = INITIALIZED
@@ -606,6 +603,9 @@ class StreamingContext private[streaming] (
}
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
+ // Registering Streaming Metrics at the start of the StreamingContext
+ assert(env.metricsSystem != null)
+ env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
@@ -682,6 +682,8 @@ class StreamingContext private[streaming] (
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
+ // Removing the streamingSource to de-register the metrics on stop()
+ env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 56b4ce5638..289a159d89 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -20,20 +20,23 @@ package org.apache.spark.streaming
import java.io.{File, NotSerializableException}
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Queue
import org.apache.commons.io.FileUtils
+import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester}
import org.scalatest.concurrent.Eventually._
import org.scalatest.concurrent.Timeouts
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.SpanSugar._
-import org.scalatest.{Assertions, BeforeAndAfter}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite}
class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging {
@@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
Thread.sleep(100)
}
+ test ("registering and de-registering of streamingSource") {
+ val conf = new SparkConf().setMaster(master).setAppName(appName)
+ ssc = new StreamingContext(conf, batchDuration)
+ assert(ssc.getState() === StreamingContextState.INITIALIZED)
+ addInputStream(ssc).register()
+ ssc.start()
+
+ val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem)
+ val streamingSource = StreamingContextSuite.getStreamingSource(ssc)
+ assert(sources.contains(streamingSource))
+ assert(ssc.getState() === StreamingContextState.ACTIVE)
+
+ ssc.stop()
+ val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem)
+ val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc)
+ assert(ssc.getState() === StreamingContextState.STOPPED)
+ assert(!sourcesAfterStop.contains(streamingSourceAfterStop))
+ }
+
test("awaitTermination") {
ssc = new StreamingContext(master, appName, batchDuration)
val inputStream = addInputStream(ssc)
@@ -811,3 +833,18 @@ package object testPackage extends Assertions {
}
}
}
+
+/**
+ * Helper methods for testing StreamingContextSuite
+ * This includes methods to access private methods and fields in StreamingContext and MetricsSystem
+ */
+private object StreamingContextSuite extends PrivateMethodTester {
+ private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources)
+ private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = {
+ metricsSystem.invokePrivate(_sources())
+ }
+ private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource)
+ private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = {
+ streamingContext.invokePrivate(_streamingSource())
+ }
+}