aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala123
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala173
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/ListenerBus.scala66
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala95
8 files changed, 300 insertions, 235 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 36a6e6338f..be23056e7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -17,10 +17,9 @@
package org.apache.spark.scheduler
-import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
+import java.util.concurrent.atomic.AtomicBoolean
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.AsynchronousListenerBus
/**
* Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -29,113 +28,19 @@ import org.apache.spark.util.Utils
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
*/
-private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
- private var started = false
-
- // A counter that represents the number of events produced and consumed in the queue
- private val eventLock = new Semaphore(0)
-
- private val listenerThread = new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run(): Unit = Utils.logUncaughtExceptions {
- while (true) {
- eventLock.acquire()
- // Atomically remove and process this event
- LiveListenerBus.this.synchronized {
- val event = eventQueue.poll
- if (event == SparkListenerShutdown) {
- // Get out of the while loop and shutdown the daemon thread
- return
- }
- Option(event).foreach(postToAll)
- }
- }
- }
- }
-
- /**
- * Start sending events to attached listeners.
- *
- * This first sends out all buffered events posted before this listener bus has started, then
- * listens for any additional events asynchronously while the listener bus is still running.
- * This should only be called once.
- */
- def start() {
- if (started) {
- throw new IllegalStateException("Listener bus already started!")
+private[spark] class LiveListenerBus
+ extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
+ with SparkListenerBus {
+
+ private val logDroppedEvent = new AtomicBoolean(false)
+
+ override def onDropEvent(event: SparkListenerEvent): Unit = {
+ if (logDroppedEvent.compareAndSet(false, true)) {
+ // Only log the following message once to avoid duplicated annoying logs.
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with " +
+ "the rate at which tasks are being started by the scheduler.")
}
- listenerThread.start()
- started = true
}
- def post(event: SparkListenerEvent) {
- val eventAdded = eventQueue.offer(event)
- if (eventAdded) {
- eventLock.release()
- } else {
- logQueueFullErrorMessage()
- }
- }
-
- /**
- * For testing only. Wait until there are no more events in the queue, or until the specified
- * time has elapsed. Return true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!queueIsEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and
- * wait/notify add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- /**
- * For testing only. Return whether the listener daemon thread is still alive.
- */
- def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
-
- /**
- * Return whether the event queue is empty.
- *
- * The use of synchronized here guarantees that all events that once belonged to this queue
- * have already been processed by all attached listeners, if this returns true.
- */
- def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
-
- /**
- * Log an error message to indicate that the event queue is full. Do this only once.
- */
- private def logQueueFullErrorMessage(): Unit = {
- if (!queueFullErrorMessageLogged) {
- if (listenerThread.isAlive) {
- logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
- "This likely means one of the SparkListeners is too slow and cannot keep up with" +
- "the rate at which tasks are being started by the scheduler.")
- } else {
- logError("SparkListenerBus thread is dead! This means SparkListenerEvents have not" +
- "been (and will no longer be) propagated to listeners for some time.")
- }
- queueFullErrorMessageLogged = true
- }
- }
-
- def stop() {
- if (!started) {
- throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
- }
- post(SparkListenerShutdown)
- listenerThread.join()
- }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 8f5ceaa5de..dd28ddb31d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -116,9 +116,6 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String],
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[spark] case object SparkListenerShutdown extends SparkListenerEvent
-
/**
* :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index e700c6af54..fe8a19a2c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -17,78 +17,47 @@
package org.apache.spark.scheduler
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.ListenerBus
/**
- * A SparkListenerEvent bus that relays events to its listeners
+ * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
-private[spark] trait SparkListenerBus extends Logging {
-
- // SparkListeners attached to this event bus
- protected val sparkListeners = new ArrayBuffer[SparkListener]
- with mutable.SynchronizedBuffer[SparkListener]
-
- def addListener(listener: SparkListener) {
- sparkListeners += listener
- }
+private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
- /**
- * Post an event to all attached listeners.
- * This does nothing if the event is SparkListenerShutdown.
- */
- def postToAll(event: SparkListenerEvent) {
+ override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
- foreachListener(_.onStageSubmitted(stageSubmitted))
+ listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
- foreachListener(_.onStageCompleted(stageCompleted))
+ listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
- foreachListener(_.onJobStart(jobStart))
+ listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
- foreachListener(_.onJobEnd(jobEnd))
+ listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
- foreachListener(_.onTaskStart(taskStart))
+ listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
- foreachListener(_.onTaskGettingResult(taskGettingResult))
+ listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
- foreachListener(_.onTaskEnd(taskEnd))
+ listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
- foreachListener(_.onEnvironmentUpdate(environmentUpdate))
+ listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
- foreachListener(_.onBlockManagerAdded(blockManagerAdded))
+ listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
- foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
+ listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
- foreachListener(_.onUnpersistRDD(unpersistRDD))
+ listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
- foreachListener(_.onApplicationStart(applicationStart))
+ listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
- foreachListener(_.onApplicationEnd(applicationEnd))
+ listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
- foreachListener(_.onExecutorMetricsUpdate(metricsUpdate))
+ listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
- foreachListener(_.onExecutorAdded(executorAdded))
+ listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
- foreachListener(_.onExecutorRemoved(executorRemoved))
- case SparkListenerShutdown =>
- }
- }
-
- /**
- * Apply the given function to all attached listeners, catching and logging any exception.
- */
- private def foreachListener(f: SparkListener => Unit): Unit = {
- sparkListeners.foreach { listener =>
- try {
- f(listener)
- } catch {
- case e: Exception =>
- logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
- }
+ listener.onExecutorRemoved(executorRemoved)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
new file mode 100644
index 0000000000..18c627e8c7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.google.common.annotations.VisibleForTesting
+
+/**
+ * Asynchronously passes events to registered listeners.
+ *
+ * Until `start()` is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when `stop()` is called, and it will drop further events after stopping.
+ *
+ * @param name name of the listener bus, will be the name of the listener thread.
+ * @tparam L type of listener
+ * @tparam E type of event
+ */
+private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
+ extends ListenerBus[L, E] {
+
+ self =>
+
+ /* Cap the capacity of the event queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
+
+ // Indicate if `start()` is called
+ private val started = new AtomicBoolean(false)
+ // Indicate if `stop()` is called
+ private val stopped = new AtomicBoolean(false)
+
+ // Indicate if we are processing some event
+ // Guarded by `self`
+ private var processingEvent = false
+
+ // A counter that represents the number of events produced and consumed in the queue
+ private val eventLock = new Semaphore(0)
+
+ private val listenerThread = new Thread(name) {
+ setDaemon(true)
+ override def run(): Unit = Utils.logUncaughtExceptions {
+ while (true) {
+ eventLock.acquire()
+ self.synchronized {
+ processingEvent = true
+ }
+ try {
+ val event = eventQueue.poll
+ if (event == null) {
+ // Get out of the while loop and shutdown the daemon thread
+ if (!stopped.get) {
+ throw new IllegalStateException("Polling `null` from eventQueue means" +
+ " the listener bus has been stopped. So `stopped` must be true")
+ }
+ return
+ }
+ postToAll(event)
+ } finally {
+ self.synchronized {
+ processingEvent = false
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Start sending events to attached listeners.
+ *
+ * This first sends out all buffered events posted before this listener bus has started, then
+ * listens for any additional events asynchronously while the listener bus is still running.
+ * This should only be called once.
+ */
+ def start() {
+ if (started.compareAndSet(false, true)) {
+ listenerThread.start()
+ } else {
+ throw new IllegalStateException(s"$name already started!")
+ }
+ }
+
+ def post(event: E) {
+ if (stopped.get) {
+ // Drop further events to make `listenerThread` exit ASAP
+ logError(s"$name has already stopped! Dropping event $event")
+ return
+ }
+ val eventAdded = eventQueue.offer(event)
+ if (eventAdded) {
+ eventLock.release()
+ } else {
+ onDropEvent(event)
+ }
+ }
+
+ /**
+ * For testing only. Wait until there are no more events in the queue, or until the specified
+ * time has elapsed. Return true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ @VisibleForTesting
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!queueIsEmpty) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and
+ * wait/notify add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ true
+ }
+
+ /**
+ * For testing only. Return whether the listener daemon thread is still alive.
+ */
+ @VisibleForTesting
+ def listenerThreadIsAlive: Boolean = listenerThread.isAlive
+
+ /**
+ * Return whether the event queue is empty.
+ *
+ * The use of synchronized here guarantees that all events that once belonged to this queue
+ * have already been processed by all attached listeners, if this returns true.
+ */
+ private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
+
+ /**
+ * Stop the listener bus. It will wait until the queued events have been processed, but drop the
+ * new events after stopping.
+ */
+ def stop() {
+ if (!started.get()) {
+ throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
+ }
+ if (stopped.compareAndSet(false, true)) {
+ // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
+ // `stop` is called.
+ eventLock.release()
+ listenerThread.join()
+ } else {
+ // Keep quiet
+ }
+ }
+
+ /**
+ * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
+ * notified with the dropped events.
+ *
+ * Note: `onDropEvent` can be called in any thread.
+ */
+ def onDropEvent(event: E): Unit
+}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index b5f736dc41..414bc49a57 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -91,7 +91,6 @@ private[spark] object JsonProtocol {
case executorRemoved: SparkListenerExecutorRemoved =>
executorRemovedToJson(executorRemoved)
// These aren't used, but keeps compiler happy
- case SparkListenerShutdown => JNothing
case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
new file mode 100644
index 0000000000..bd0aa4dc46
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.util.concurrent.CopyOnWriteArrayList
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.Logging
+
+/**
+ * An event bus which posts events to its listeners.
+ */
+private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
+
+ private val listeners = new CopyOnWriteArrayList[L]
+
+ /**
+ * Add a listener to listen events. This method is thread-safe and can be called in any thread.
+ */
+ final def addListener(listener: L) {
+ listeners.add(listener)
+ }
+
+ /**
+ * Post the event to all registered listeners. The `postToAll` caller should guarantee calling
+ * `postToAll` in the same thread for all events.
+ */
+ final def postToAll(event: E): Unit = {
+ // JavaConversions will create a JIterableWrapper if we use some Scala collection functions.
+ // However, this method will be called frequently. To avoid the wrapper cost, here ewe use
+ // Java Iterator directly.
+ val iter = listeners.iterator
+ while (iter.hasNext) {
+ val listener = iter.next()
+ try {
+ onPostEvent(listener, event)
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
+ }
+ }
+ }
+
+ /**
+ * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
+ * thread.
+ */
+ def onPostEvent(listener: L, event: E): Unit
+
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index ed1aa114e1..74dbba453f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo)
case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
-/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
-
/**
* :: DeveloperApi ::
* A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 398724d9e8..b07d6cf347 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -17,83 +17,42 @@
package org.apache.spark.streaming.scheduler
+import java.util.concurrent.atomic.AtomicBoolean
+
import org.apache.spark.Logging
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import java.util.concurrent.LinkedBlockingQueue
+import org.apache.spark.util.AsynchronousListenerBus
/** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
-private[spark] class StreamingListenerBus() extends Logging {
- private val listeners = new ArrayBuffer[StreamingListener]()
- with SynchronizedBuffer[StreamingListener]
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
-
- val listenerThread = new Thread("StreamingListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- event match {
- case receiverStarted: StreamingListenerReceiverStarted =>
- listeners.foreach(_.onReceiverStarted(receiverStarted))
- case receiverError: StreamingListenerReceiverError =>
- listeners.foreach(_.onReceiverError(receiverError))
- case receiverStopped: StreamingListenerReceiverStopped =>
- listeners.foreach(_.onReceiverStopped(receiverStopped))
- case batchSubmitted: StreamingListenerBatchSubmitted =>
- listeners.foreach(_.onBatchSubmitted(batchSubmitted))
- case batchStarted: StreamingListenerBatchStarted =>
- listeners.foreach(_.onBatchStarted(batchStarted))
- case batchCompleted: StreamingListenerBatchCompleted =>
- listeners.foreach(_.onBatchCompleted(batchCompleted))
- case StreamingListenerShutdown =>
- // Get out of the while loop and shutdown the daemon thread
- return
- case _ =>
- }
- }
+private[spark] class StreamingListenerBus
+ extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus")
+ with Logging {
+
+ private val logDroppedEvent = new AtomicBoolean(false)
+
+ override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = {
+ event match {
+ case receiverStarted: StreamingListenerReceiverStarted =>
+ listener.onReceiverStarted(receiverStarted)
+ case receiverError: StreamingListenerReceiverError =>
+ listener.onReceiverError(receiverError)
+ case receiverStopped: StreamingListenerReceiverStopped =>
+ listener.onReceiverStopped(receiverStopped)
+ case batchSubmitted: StreamingListenerBatchSubmitted =>
+ listener.onBatchSubmitted(batchSubmitted)
+ case batchStarted: StreamingListenerBatchStarted =>
+ listener.onBatchStarted(batchStarted)
+ case batchCompleted: StreamingListenerBatchCompleted =>
+ listener.onBatchCompleted(batchCompleted)
+ case _ =>
}
}
- def start() {
- listenerThread.start()
- }
-
- def addListener(listener: StreamingListener) {
- listeners += listener
- }
-
- def post(event: StreamingListenerEvent) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
+ override def onDropEvent(event: StreamingListenerEvent): Unit = {
+ if (logDroppedEvent.compareAndSet(false, true)) {
+ // Only log the following message once to avoid duplicated annoying logs.
logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
"This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
"rate at which events are being started by the scheduler.")
- queueFullErrorMessageLogged = true
}
}
-
- /**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- * add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- def stop(): Unit = post(StreamingListenerShutdown)
}