aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 11:57:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 11:57:53 -0800
commit944fdadf77523570f6b33544ad0b388031498952 (patch)
tree4cb10ba280d004567897904d73f29c310b52028b /streaming
parente3727c409fe7d1fb6e27a14faddd0602f963745e (diff)
downloadspark-944fdadf77523570f6b33544ad0b388031498952.tar.gz
spark-944fdadf77523570f6b33544ad0b388031498952.tar.bz2
spark-944fdadf77523570f6b33544ad0b388031498952.zip
[SPARK-12847][CORE][STREAMING] Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
Including the following changes: 1. Add StreamingListenerForwardingBus to WrappedStreamingListenerEvent process events in `onOtherEvent` to StreamingListener 2. Remove StreamingListenerBus 3. Merge AsynchronousListenerBus and LiveListenerBus to the same class LiveListenerBus 4. Add `logEvent` method to SparkListenerEvent so that EventLoggingListener can use it to ignore WrappedStreamingListenerEvents Author: Shixiong Zhu <shixiong@databricks.com> Closes #10779 from zsxwing/streaming-listener.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala69
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala2
6 files changed, 83 insertions, 25 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 157ee92fd7..b7070dda99 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -37,6 +37,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.{RDD, RDDOperationScope}
+import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
@@ -44,7 +45,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{AsynchronousListenerBus, CallSite, ShutdownHookManager, ThreadUtils, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -694,9 +695,9 @@ class StreamingContext private[streaming] (
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
- if (AsynchronousListenerBus.withinListenerThread.value) {
- throw new SparkException("Cannot stop StreamingContext within listener thread of" +
- " AsynchronousListenerBus")
+ if (LiveListenerBus.withinListenerThread.value) {
+ throw new SparkException(
+ s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}")
}
synchronized {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1ed6fb0aa9..9535c8e5b7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -49,7 +49,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
- val listenerBus = new StreamingListenerBus()
+ val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus)
// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
@@ -76,7 +76,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
- listenerBus.start(ssc.sparkContext)
+ listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index ca111bb636..39f6e711a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,19 +17,37 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent}
+import org.apache.spark.util.ListenerBus
-import org.apache.spark.Logging
-import org.apache.spark.util.AsynchronousListenerBus
+/**
+ * A Streaming listener bus to forward events to StreamingListeners. This one will wrap received
+ * Streaming events as WrappedStreamingListenerEvent and send them to Spark listener bus. It also
+ * registers itself with Spark listener bus, so that it can receive WrappedStreamingListenerEvents,
+ * unwrap them as StreamingListenerEvent and dispatch them to StreamingListeners.
+ */
+private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
+ extends SparkListener with ListenerBus[StreamingListener, StreamingListenerEvent] {
-/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus
- extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
- with Logging {
+ /**
+ * Post a StreamingListenerEvent to the Spark listener bus asynchronously. This event will be
+ * dispatched to all StreamingListeners in the thread of the Spark listener bus.
+ */
+ def post(event: StreamingListenerEvent) {
+ sparkListenerBus.post(new WrappedStreamingListenerEvent(event))
+ }
- private val logDroppedEvent = new AtomicBoolean(false)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case WrappedStreamingListenerEvent(e) =>
+ postToAll(e)
+ case _ =>
+ }
+ }
- override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+ protected override def doPostEvent(
+ listener: StreamingListener,
+ event: StreamingListenerEvent): Unit = {
event match {
case receiverStarted: StreamingListenerReceiverStarted =>
listener.onReceiverStarted(receiverStarted)
@@ -51,12 +69,31 @@ private[spark] class StreamingListenerBus
}
}
- override def onDropEvent(event: StreamingListenerEvent): Unit = {
- if (logDroppedEvent.compareAndSet(false, true)) {
- // Only log the following message once to avoid duplicated annoying logs.
- logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
- "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
- "rate at which events are being started by the scheduler.")
- }
+ /**
+ * Register this one with the Spark listener bus so that it can receive Streaming events and
+ * forward them to StreamingListeners.
+ */
+ def start(): Unit = {
+ sparkListenerBus.addListener(this) // for getting callbacks on spark events
+ }
+
+ /**
+ * Unregister this one with the Spark listener bus and all StreamingListeners won't receive any
+ * events after that.
+ */
+ def stop(): Unit = {
+ sparkListenerBus.removeListener(this)
+ }
+
+ /**
+ * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can be posted to Spark
+ * listener bus.
+ */
+ private case class WrappedStreamingListenerEvent(streamingListenerEvent: StreamingListenerEvent)
+ extends SparkListenerEvent {
+
+ // Do not log streaming events in event log as history server does not support streaming
+ // events (SPARK-12140). TODO Once SPARK-12140 is resolved we should set it to true.
+ protected[spark] override def logEvent: Boolean = false
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 2e231601c3..75591f04ca 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -77,7 +77,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
// Ensure progress listener has been notified of all events
- ssc.scheduler.listenerBus.waitUntilEmpty(500)
+ ssc.sparkContext.listenerBus.waitUntilEmpty(500)
// Verify all "InputInfo"s have been reported
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 628a508207..1ed68c74db 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, Synch
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
+import org.mockito.Mockito.{mock, reset, verifyNoMoreInteractions}
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
@@ -216,6 +217,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
assert(failureReasons(1).contains("This is another failed job"))
}
+ test("StreamingListener receives no events after stopping StreamingListenerBus") {
+ val streamingListener = mock(classOf[StreamingListener])
+
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ ssc.addStreamingListener(streamingListener)
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count)
+ ssc.start()
+ ssc.stop()
+
+ // Because "streamingListener" has already received some events, let's clear that.
+ reset(streamingListener)
+
+ // Post a Streaming event after stopping StreamingContext
+ val receiverInfoStopped = ReceiverInfo(0, "test", false, "localhost", "0")
+ ssc.scheduler.listenerBus.post(StreamingListenerReceiverStopped(receiverInfoStopped))
+ ssc.sparkContext.listenerBus.waitUntilEmpty(1000)
+ // The StreamingListener should not receive any event
+ verifyNoMoreInteractions(streamingListener)
+ }
+
private def startStreamingContextAndCallStop(_ssc: StreamingContext): Unit = {
val contextStoppingCollector = new StreamingContextStoppingCollector(_ssc)
_ssc.addStreamingListener(contextStoppingCollector)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index b67189fbd7..cfd7f86f84 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -34,8 +34,6 @@ class ReceiverTrackerSuite extends TestSuiteBase {
test("send rate update to receivers") {
withStreamingContext(new StreamingContext(conf, Milliseconds(100))) { ssc =>
- ssc.scheduler.listenerBus.start(ssc.sc)
-
val newRateLimit = 100L
val inputDStream = new RateTestInputDStream(ssc)
val tracker = new ReceiverTracker(ssc)