diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala | 3 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala | 95 |
2 files changed, 27 insertions, 71 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index ed1aa114e1..74dbba453f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -/** An event used in the listener to shutdown the listener daemon thread. */ -private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent - /** * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 398724d9e8..b07d6cf347 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -17,83 +17,42 @@ package org.apache.spark.streaming.scheduler +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.Logging -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import java.util.concurrent.LinkedBlockingQueue +import org.apache.spark.util.AsynchronousListenerBus /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ -private[spark] class StreamingListenerBus() extends Logging { - private val listeners = new ArrayBuffer[StreamingListener]() - with SynchronizedBuffer[StreamingListener] - - /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than - * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY) - private var queueFullErrorMessageLogged = false - - val listenerThread = new Thread("StreamingListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - event match { - case receiverStarted: StreamingListenerReceiverStarted => - listeners.foreach(_.onReceiverStarted(receiverStarted)) - case receiverError: StreamingListenerReceiverError => - listeners.foreach(_.onReceiverError(receiverError)) - case receiverStopped: StreamingListenerReceiverStopped => - listeners.foreach(_.onReceiverStopped(receiverStopped)) - case batchSubmitted: StreamingListenerBatchSubmitted => - listeners.foreach(_.onBatchSubmitted(batchSubmitted)) - case batchStarted: StreamingListenerBatchStarted => - listeners.foreach(_.onBatchStarted(batchStarted)) - case batchCompleted: StreamingListenerBatchCompleted => - listeners.foreach(_.onBatchCompleted(batchCompleted)) - case StreamingListenerShutdown => - // Get out of the while loop and shutdown the daemon thread - return - case _ => - } - } +private[spark] class StreamingListenerBus + extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus") + with Logging { + + private val logDroppedEvent = new AtomicBoolean(false) + + override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = { + event match { + case receiverStarted: StreamingListenerReceiverStarted => + listener.onReceiverStarted(receiverStarted) + case receiverError: StreamingListenerReceiverError => + listener.onReceiverError(receiverError) + case receiverStopped: StreamingListenerReceiverStopped => + listener.onReceiverStopped(receiverStopped) + case batchSubmitted: StreamingListenerBatchSubmitted => + listener.onBatchSubmitted(batchSubmitted) + case batchStarted: StreamingListenerBatchStarted => + listener.onBatchStarted(batchStarted) + case batchCompleted: StreamingListenerBatchCompleted => + listener.onBatchCompleted(batchCompleted) + case _ => } } - def start() { - listenerThread.start() - } - - def addListener(listener: StreamingListener) { - listeners += listener - } - - def post(event: StreamingListenerEvent) { - val eventAdded = eventQueue.offer(event) - if (!eventAdded && !queueFullErrorMessageLogged) { + override def onDropEvent(event: StreamingListenerEvent): Unit = { + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. logError("Dropping StreamingListenerEvent because no remaining room in event queue. " + "This likely means one of the StreamingListeners is too slow and cannot keep up with the " + "rate at which events are being started by the scheduler.") - queueFullErrorMessageLogged = true } } - - /** - * Waits until there are no more events in the queue, or until the specified time has elapsed. - * Used for testing only. Returns true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. - */ - def waitUntilEmpty(timeoutMillis: Int): Boolean = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty) { - if (System.currentTimeMillis > finishTime) { - return false - } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ - Thread.sleep(10) - } - true - } - - def stop(): Unit = post(StreamingListenerShutdown) } |