aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-02-01 17:47:51 -0800
committerAndrew Or <andrew@databricks.com>2015-02-01 17:48:41 -0800
commit883bc88d520b27bdeb74a1837b45ef0b59753568 (patch)
tree76e1e33a5373af8b3636385e6f555fced822d2dc /streaming
parent4a171225ba628192a5ae43a99dc50508cf12491c (diff)
downloadspark-883bc88d520b27bdeb74a1837b45ef0b59753568.tar.gz
spark-883bc88d520b27bdeb74a1837b45ef0b59753568.tar.bz2
spark-883bc88d520b27bdeb74a1837b45ef0b59753568.zip
[SPARK-4859][Core][Streaming] Refactor LiveListenerBus and StreamingListenerBus
This PR refactors LiveListenerBus and StreamingListenerBus and extracts the common codes to a parent class `ListenerBus`. It also includes bug fixes in #3710: 1. Fix the race condition of queueFullErrorMessageLogged in LiveListenerBus and StreamingListenerBus to avoid outputing `queue-full-error` logs multiple times. 2. Make sure the SHUTDOWN message will be delivered to listenerThread, so that we can make sure listenerThread will always be able to exit. 3. Log the error from listener rather than crashing listenerThread in StreamingListenerBus. During fixing the above bugs, we find it's better to make LiveListenerBus and StreamingListenerBus have the same bahaviors. Then there will be many duplicated codes in LiveListenerBus and StreamingListenerBus. Therefore, I extracted their common codes to `ListenerBus` as a parent class: LiveListenerBus and StreamingListenerBus only need to extend `ListenerBus` and implement `onPostEvent` (how to process an event) and `onDropEvent` (do something when droppping an event). Author: zsxwing <zsxwing@gmail.com> Closes #4006 from zsxwing/SPARK-4859-refactor and squashes the following commits: c8dade2 [zsxwing] Fix the code style after renaming 5715061 [zsxwing] Rename ListenerHelper to ListenerBus and the original ListenerBus to AsynchronousListenerBus f0ef647 [zsxwing] Fix the code style 4e85ffc [zsxwing] Merge branch 'master' into SPARK-4859-refactor d2ef990 [zsxwing] Add private[spark] 4539f91 [zsxwing] Remove final to pass MiMa tests a9dccd3 [zsxwing] Remove SparkListenerShutdown 7cc04c3 [zsxwing] Refactor LiveListenerBus and StreamingListenerBus and make them share same code base
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala95
2 files changed, 27 insertions, 71 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index ed1aa114e1..74dbba453f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
-
/**
* :: DeveloperApi ::
* A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 398724d9e8..b07d6cf347 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,83 +17,42 @@
package org.apache.spark.streaming.scheduler
+import java.util.concurrent.atomic.AtomicBoolean
+
import org.apache.spark.Logging
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.util.AsynchronousListenerBus
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus() extends Logging {
- private val listeners = new ArrayBuffer[StreamingListener]()
- with SynchronizedBuffer[StreamingListener]
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
-
- val listenerThread = new Thread("StreamingListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- event match {
- case receiverStarted: StreamingListenerReceiverStarted =>
- listeners.foreach(_.onReceiverStarted(receiverStarted))
- case receiverError: StreamingListenerReceiverError =>
- listeners.foreach(_.onReceiverError(receiverError))
- case receiverStopped: StreamingListenerReceiverStopped =>
- listeners.foreach(_.onReceiverStopped(receiverStopped))
- case batchSubmitted: StreamingListenerBatchSubmitted =>
- listeners.foreach(_.onBatchSubmitted(batchSubmitted))
- case batchStarted: StreamingListenerBatchStarted =>
- listeners.foreach(_.onBatchStarted(batchStarted))
- case batchCompleted: StreamingListenerBatchCompleted =>
- listeners.foreach(_.onBatchCompleted(batchCompleted))
- case StreamingListenerShutdown =>
- // Get out of the while loop and shutdown the daemon thread
- return
- case _ =>
- }
- }
+private[spark] class StreamingListenerBus
+ extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
+ with Logging {
+
+ private val logDroppedEvent = new AtomicBoolean(false)
+
+ override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+ event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listener.onReceiverStarted(receiverStarted)
+ case receiverError: StreamingListenerReceiverError =>
+ listener.onReceiverError(receiverError)
+ case receiverStopped: StreamingListenerReceiverStopped =>
+ listener.onReceiverStopped(receiverStopped)
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listener.onBatchSubmitted(batchSubmitted)
+ case batchStarted: StreamingListenerBatchStarted =>
+ listener.onBatchStarted(batchStarted)
+ case batchCompleted: StreamingListenerBatchCompleted =>
+ listener.onBatchCompleted(batchCompleted)
+ case _ =>
}
}
- def start() {
- listenerThread.start()
- }
-
- def addListener(listener: StreamingListener) {
- listeners += listener
- }
-
- def post(event: StreamingListenerEvent) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
+ override def onDropEvent(event: StreamingListenerEvent): Unit = {
+ if (logDroppedEvent.compareAndSet(false, true)) {
+ // Only log the following message once to avoid duplicated annoying logs.
logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which events are being started by the scheduler.")
- queueFullErrorMessageLogged = true
}
}
-
- /**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- * add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- def stop(): Unit = post(StreamingListenerShutdown)
}