aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala95
2 files changed, 27 insertions, 71 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index ed1aa114e1..74dbba453f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
-
/**
* :: DeveloperApi ::
* A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 398724d9e8..b07d6cf347 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,83 +17,42 @@
package org.apache.spark.streaming.scheduler
+import java.util.concurrent.atomic.AtomicBoolean
+
import org.apache.spark.Logging
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.util.AsynchronousListenerBus
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus() extends Logging {
- private val listeners = new ArrayBuffer[StreamingListener]()
- with SynchronizedBuffer[StreamingListener]
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
-
- val listenerThread = new Thread("StreamingListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- event match {
- case receiverStarted: StreamingListenerReceiverStarted =>
- listeners.foreach(_.onReceiverStarted(receiverStarted))
- case receiverError: StreamingListenerReceiverError =>
- listeners.foreach(_.onReceiverError(receiverError))
- case receiverStopped: StreamingListenerReceiverStopped =>
- listeners.foreach(_.onReceiverStopped(receiverStopped))
- case batchSubmitted: StreamingListenerBatchSubmitted =>
- listeners.foreach(_.onBatchSubmitted(batchSubmitted))
- case batchStarted: StreamingListenerBatchStarted =>
- listeners.foreach(_.onBatchStarted(batchStarted))
- case batchCompleted: StreamingListenerBatchCompleted =>
- listeners.foreach(_.onBatchCompleted(batchCompleted))
- case StreamingListenerShutdown =>
- // Get out of the while loop and shutdown the daemon thread
- return
- case _ =>
- }
- }
+private[spark] class StreamingListenerBus
+ extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
+ with Logging {
+
+ private val logDroppedEvent = new AtomicBoolean(false)
+
+ override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+ event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listener.onReceiverStarted(receiverStarted)
+ case receiverError: StreamingListenerReceiverError =>
+ listener.onReceiverError(receiverError)
+ case receiverStopped: StreamingListenerReceiverStopped =>
+ listener.onReceiverStopped(receiverStopped)
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listener.onBatchSubmitted(batchSubmitted)
+ case batchStarted: StreamingListenerBatchStarted =>
+ listener.onBatchStarted(batchStarted)
+ case batchCompleted: StreamingListenerBatchCompleted =>
+ listener.onBatchCompleted(batchCompleted)
+ case _ =>
}
}
- def start() {
- listenerThread.start()
- }
-
- def addListener(listener: StreamingListener) {
- listeners += listener
- }
-
- def post(event: StreamingListenerEvent) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
+ override def onDropEvent(event: StreamingListenerEvent): Unit = {
+ if (logDroppedEvent.compareAndSet(false, true)) {
+ // Only log the following message once to avoid duplicated annoying logs.
logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which events are being started by the scheduler.")
- queueFullErrorMessageLogged = true
}
}
-
- /**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- * add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- def stop(): Unit = post(StreamingListenerShutdown)
}