diff options
8 files changed, 300 insertions, 235 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 36a6e6338f..be23056e7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -17,10 +17,9 @@ package org.apache.spark.scheduler -import java.util.concurrent.{LinkedBlockingQueue, Semaphore} +import java.util.concurrent.atomic.AtomicBoolean -import org.apache.spark.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.AsynchronousListenerBus /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -29,113 +28,19 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). */ -private[spark] class LiveListenerBus extends SparkListenerBus with Logging { - - /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than - * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) - private var queueFullErrorMessageLogged = false - private var started = false - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread("SparkListenerBus") { - setDaemon(true) - override def run(): Unit = Utils.logUncaughtExceptions { - while (true) { - eventLock.acquire() - // Atomically remove and process this event - LiveListenerBus.this.synchronized { - val event = eventQueue.poll - if (event == SparkListenerShutdown) { - // Get out of the while loop and shutdown the daemon thread - return - } - Option(event).foreach(postToAll) - } - } - } - } - - /** - * Start sending events to attached listeners. - * - * This first sends out all buffered events posted before this listener bus has started, then - * listens for any additional events asynchronously while the listener bus is still running. - * This should only be called once. - */ - def start() { - if (started) { - throw new IllegalStateException("Listener bus already started!") +private[spark] class LiveListenerBus + extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus") + with SparkListenerBus { + + private val logDroppedEvent = new AtomicBoolean(false) + + override def onDropEvent(event: SparkListenerEvent): Unit = { + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. + logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with " + + "the rate at which tasks are being started by the scheduler.") } - listenerThread.start() - started = true } - def post(event: SparkListenerEvent) { - val eventAdded = eventQueue.offer(event) - if (eventAdded) { - eventLock.release() - } else { - logQueueFullErrorMessage() - } - } - - /** - * For testing only. Wait until there are no more events in the queue, or until the specified - * time has elapsed. Return true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. - */ - def waitUntilEmpty(timeoutMillis: Int): Boolean = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!queueIsEmpty) { - if (System.currentTimeMillis > finishTime) { - return false - } - /* Sleep rather than using wait/notify, because this is used only for testing and - * wait/notify add overhead in the general case. */ - Thread.sleep(10) - } - true - } - - /** - * For testing only. Return whether the listener daemon thread is still alive. - */ - def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } - - /** - * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. - */ - def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } - - /** - * Log an error message to indicate that the event queue is full. Do this only once. - */ - private def logQueueFullErrorMessage(): Unit = { - if (!queueFullErrorMessageLogged) { - if (listenerThread.isAlive) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with" + - "the rate at which tasks are being started by the scheduler.") - } else { - logError("SparkListenerBus thread is dead! This means SparkListenerEvents have not" + - "been (and will no longer be) propagated to listeners for some time.") - } - queueFullErrorMessageLogged = true - } - } - - def stop() { - if (!started) { - throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") - } - post(SparkListenerShutdown) - listenerThread.join() - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 8f5ceaa5de..dd28ddb31d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -116,9 +116,6 @@ case class SparkListenerApplicationStart(appName: String, appId: Option[String], @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent -/** An event used in the listener to shutdown the listener daemon thread. */ -private[spark] case object SparkListenerShutdown extends SparkListenerEvent - /** * :: DeveloperApi :: diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index e700c6af54..fe8a19a2c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -17,78 +17,47 @@ package org.apache.spark.scheduler -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.Logging -import org.apache.spark.util.Utils +import org.apache.spark.util.ListenerBus /** - * A SparkListenerEvent bus that relays events to its listeners + * A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners */ -private[spark] trait SparkListenerBus extends Logging { - - // SparkListeners attached to this event bus - protected val sparkListeners = new ArrayBuffer[SparkListener] - with mutable.SynchronizedBuffer[SparkListener] - - def addListener(listener: SparkListener) { - sparkListeners += listener - } +private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] { - /** - * Post an event to all attached listeners. - * This does nothing if the event is SparkListenerShutdown. - */ - def postToAll(event: SparkListenerEvent) { + override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = { event match { case stageSubmitted: SparkListenerStageSubmitted => - foreachListener(_.onStageSubmitted(stageSubmitted)) + listener.onStageSubmitted(stageSubmitted) case stageCompleted: SparkListenerStageCompleted => - foreachListener(_.onStageCompleted(stageCompleted)) + listener.onStageCompleted(stageCompleted) case jobStart: SparkListenerJobStart => - foreachListener(_.onJobStart(jobStart)) + listener.onJobStart(jobStart) case jobEnd: SparkListenerJobEnd => - foreachListener(_.onJobEnd(jobEnd)) + listener.onJobEnd(jobEnd) case taskStart: SparkListenerTaskStart => - foreachListener(_.onTaskStart(taskStart)) + listener.onTaskStart(taskStart) case taskGettingResult: SparkListenerTaskGettingResult => - foreachListener(_.onTaskGettingResult(taskGettingResult)) + listener.onTaskGettingResult(taskGettingResult) case taskEnd: SparkListenerTaskEnd => - foreachListener(_.onTaskEnd(taskEnd)) + listener.onTaskEnd(taskEnd) case environmentUpdate: SparkListenerEnvironmentUpdate => - foreachListener(_.onEnvironmentUpdate(environmentUpdate)) + listener.onEnvironmentUpdate(environmentUpdate) case blockManagerAdded: SparkListenerBlockManagerAdded => - foreachListener(_.onBlockManagerAdded(blockManagerAdded)) + listener.onBlockManagerAdded(blockManagerAdded) case blockManagerRemoved: SparkListenerBlockManagerRemoved => - foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) + listener.onBlockManagerRemoved(blockManagerRemoved) case unpersistRDD: SparkListenerUnpersistRDD => - foreachListener(_.onUnpersistRDD(unpersistRDD)) + listener.onUnpersistRDD(unpersistRDD) case applicationStart: SparkListenerApplicationStart => - foreachListener(_.onApplicationStart(applicationStart)) + listener.onApplicationStart(applicationStart) case applicationEnd: SparkListenerApplicationEnd => - foreachListener(_.onApplicationEnd(applicationEnd)) + listener.onApplicationEnd(applicationEnd) case metricsUpdate: SparkListenerExecutorMetricsUpdate => - foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) + listener.onExecutorMetricsUpdate(metricsUpdate) case executorAdded: SparkListenerExecutorAdded => - foreachListener(_.onExecutorAdded(executorAdded)) + listener.onExecutorAdded(executorAdded) case executorRemoved: SparkListenerExecutorRemoved => - foreachListener(_.onExecutorRemoved(executorRemoved)) - case SparkListenerShutdown => - } - } - - /** - * Apply the given function to all attached listeners, catching and logging any exception. - */ - private def foreachListener(f: SparkListener => Unit): Unit = { - sparkListeners.foreach { listener => - try { - f(listener) - } catch { - case e: Exception => - logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) - } + listener.onExecutorRemoved(executorRemoved) } } diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala new file mode 100644 index 0000000000..18c627e8c7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean + +import com.google.common.annotations.VisibleForTesting + +/** + * Asynchronously passes events to registered listeners. + * + * Until `start()` is called, all posted events are only buffered. Only after this listener bus + * has started will events be actually propagated to all attached listeners. This listener bus + * is stopped when `stop()` is called, and it will drop further events after stopping. + * + * @param name name of the listener bus, will be the name of the listener thread. + * @tparam L type of listener + * @tparam E type of event + */ +private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String) + extends ListenerBus[L, E] { + + self => + + /* Cap the capacity of the event queue so we get an explicit error (rather than + * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ + private val EVENT_QUEUE_CAPACITY = 10000 + private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY) + + // Indicate if `start()` is called + private val started = new AtomicBoolean(false) + // Indicate if `stop()` is called + private val stopped = new AtomicBoolean(false) + + // Indicate if we are processing some event + // Guarded by `self` + private var processingEvent = false + + // A counter that represents the number of events produced and consumed in the queue + private val eventLock = new Semaphore(0) + + private val listenerThread = new Thread(name) { + setDaemon(true) + override def run(): Unit = Utils.logUncaughtExceptions { + while (true) { + eventLock.acquire() + self.synchronized { + processingEvent = true + } + try { + val event = eventQueue.poll + if (event == null) { + // Get out of the while loop and shutdown the daemon thread + if (!stopped.get) { + throw new IllegalStateException("Polling `null` from eventQueue means" + + " the listener bus has been stopped. So `stopped` must be true") + } + return + } + postToAll(event) + } finally { + self.synchronized { + processingEvent = false + } + } + } + } + } + + /** + * Start sending events to attached listeners. + * + * This first sends out all buffered events posted before this listener bus has started, then + * listens for any additional events asynchronously while the listener bus is still running. + * This should only be called once. + */ + def start() { + if (started.compareAndSet(false, true)) { + listenerThread.start() + } else { + throw new IllegalStateException(s"$name already started!") + } + } + + def post(event: E) { + if (stopped.get) { + // Drop further events to make `listenerThread` exit ASAP + logError(s"$name has already stopped! Dropping event $event") + return + } + val eventAdded = eventQueue.offer(event) + if (eventAdded) { + eventLock.release() + } else { + onDropEvent(event) + } + } + + /** + * For testing only. Wait until there are no more events in the queue, or until the specified + * time has elapsed. Return true if the queue has emptied and false is the specified time + * elapsed before the queue emptied. + */ + @VisibleForTesting + def waitUntilEmpty(timeoutMillis: Int): Boolean = { + val finishTime = System.currentTimeMillis + timeoutMillis + while (!queueIsEmpty) { + if (System.currentTimeMillis > finishTime) { + return false + } + /* Sleep rather than using wait/notify, because this is used only for testing and + * wait/notify add overhead in the general case. */ + Thread.sleep(10) + } + true + } + + /** + * For testing only. Return whether the listener daemon thread is still alive. + */ + @VisibleForTesting + def listenerThreadIsAlive: Boolean = listenerThread.isAlive + + /** + * Return whether the event queue is empty. + * + * The use of synchronized here guarantees that all events that once belonged to this queue + * have already been processed by all attached listeners, if this returns true. + */ + private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } + + /** + * Stop the listener bus. It will wait until the queued events have been processed, but drop the + * new events after stopping. + */ + def stop() { + if (!started.get()) { + throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + } + if (stopped.compareAndSet(false, true)) { + // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know + // `stop` is called. + eventLock.release() + listenerThread.join() + } else { + // Keep quiet + } + } + + /** + * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be + * notified with the dropped events. + * + * Note: `onDropEvent` can be called in any thread. + */ + def onDropEvent(event: E): Unit +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index b5f736dc41..414bc49a57 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -91,7 +91,6 @@ private[spark] object JsonProtocol { case executorRemoved: SparkListenerExecutorRemoved => executorRemovedToJson(executorRemoved) // These aren't used, but keeps compiler happy - case SparkListenerShutdown => JNothing case SparkListenerExecutorMetricsUpdate(_, _) => JNothing } } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala new file mode 100644 index 0000000000..bd0aa4dc46 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent.CopyOnWriteArrayList + +import scala.util.control.NonFatal + +import org.apache.spark.Logging + +/** + * An event bus which posts events to its listeners. + */ +private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { + + private val listeners = new CopyOnWriteArrayList[L] + + /** + * Add a listener to listen events. This method is thread-safe and can be called in any thread. + */ + final def addListener(listener: L) { + listeners.add(listener) + } + + /** + * Post the event to all registered listeners. The `postToAll` caller should guarantee calling + * `postToAll` in the same thread for all events. + */ + final def postToAll(event: E): Unit = { + // JavaConversions will create a JIterableWrapper if we use some Scala collection functions. + // However, this method will be called frequently. To avoid the wrapper cost, here ewe use + // Java Iterator directly. + val iter = listeners.iterator + while (iter.hasNext) { + val listener = iter.next() + try { + onPostEvent(listener, event) + } catch { + case NonFatal(e) => + logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) + } + } + } + + /** + * Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same + * thread. + */ + def onPostEvent(listener: L, event: E): Unit + +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index ed1aa114e1..74dbba453f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -50,9 +50,6 @@ case class StreamingListenerReceiverError(receiverInfo: ReceiverInfo) case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo) extends StreamingListenerEvent -/** An event used in the listener to shutdown the listener daemon thread. */ -private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent - /** * :: DeveloperApi :: * A listener interface for receiving information about an ongoing streaming diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 398724d9e8..b07d6cf347 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -17,83 +17,42 @@ package org.apache.spark.streaming.scheduler +import java.util.concurrent.atomic.AtomicBoolean + import org.apache.spark.Logging -import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import java.util.concurrent.LinkedBlockingQueue +import org.apache.spark.util.AsynchronousListenerBus /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */ -private[spark] class StreamingListenerBus() extends Logging { - private val listeners = new ArrayBuffer[StreamingListener]() - with SynchronizedBuffer[StreamingListener] - - /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than - * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */ - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY) - private var queueFullErrorMessageLogged = false - - val listenerThread = new Thread("StreamingListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - event match { - case receiverStarted: StreamingListenerReceiverStarted => - listeners.foreach(_.onReceiverStarted(receiverStarted)) - case receiverError: StreamingListenerReceiverError => - listeners.foreach(_.onReceiverError(receiverError)) - case receiverStopped: StreamingListenerReceiverStopped => - listeners.foreach(_.onReceiverStopped(receiverStopped)) - case batchSubmitted: StreamingListenerBatchSubmitted => - listeners.foreach(_.onBatchSubmitted(batchSubmitted)) - case batchStarted: StreamingListenerBatchStarted => - listeners.foreach(_.onBatchStarted(batchStarted)) - case batchCompleted: StreamingListenerBatchCompleted => - listeners.foreach(_.onBatchCompleted(batchCompleted)) - case StreamingListenerShutdown => - // Get out of the while loop and shutdown the daemon thread - return - case _ => - } - } +private[spark] class StreamingListenerBus + extends AsynchronousListenerBus[StreamingListener, StreamingListenerEvent]("StreamingListenerBus") + with Logging { + + private val logDroppedEvent = new AtomicBoolean(false) + + override def onPostEvent(listener: StreamingListener, event: StreamingListenerEvent): Unit = { + event match { + case receiverStarted: StreamingListenerReceiverStarted => + listener.onReceiverStarted(receiverStarted) + case receiverError: StreamingListenerReceiverError => + listener.onReceiverError(receiverError) + case receiverStopped: StreamingListenerReceiverStopped => + listener.onReceiverStopped(receiverStopped) + case batchSubmitted: StreamingListenerBatchSubmitted => + listener.onBatchSubmitted(batchSubmitted) + case batchStarted: StreamingListenerBatchStarted => + listener.onBatchStarted(batchStarted) + case batchCompleted: StreamingListenerBatchCompleted => + listener.onBatchCompleted(batchCompleted) + case _ => } } - def start() { - listenerThread.start() - } - - def addListener(listener: StreamingListener) { - listeners += listener - } - - def post(event: StreamingListenerEvent) { - val eventAdded = eventQueue.offer(event) - if (!eventAdded && !queueFullErrorMessageLogged) { + override def onDropEvent(event: StreamingListenerEvent): Unit = { + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. logError("Dropping StreamingListenerEvent because no remaining room in event queue. " + "This likely means one of the StreamingListeners is too slow and cannot keep up with the " + "rate at which events are being started by the scheduler.") - queueFullErrorMessageLogged = true } } - - /** - * Waits until there are no more events in the queue, or until the specified time has elapsed. - * Used for testing only. Returns true if the queue has emptied and false is the specified time - * elapsed before the queue emptied. - */ - def waitUntilEmpty(timeoutMillis: Int): Boolean = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty) { - if (System.currentTimeMillis > finishTime) { - return false - } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ - Thread.sleep(10) - } - true - } - - def stop(): Unit = post(StreamingListenerShutdown) } |