From 7883b8f5798e3de6f55a1182a5d5775c4aaa783b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sun, 12 Jan 2014 16:44:07 -0800 Subject: Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close. --- .../scala/org/apache/spark/streaming/DStream.scala | 6 +-- .../spark/streaming/scheduler/JobGenerator.scala | 43 ++++++++++++++-------- .../spark/streaming/scheduler/JobScheduler.scala | 27 +++++++++----- .../streaming/scheduler/NetworkInputTracker.scala | 24 +++++++++--- .../streaming/scheduler/StreamingListener.scala | 3 +- .../streaming/scheduler/StreamingListenerBus.scala | 21 ++++++++--- .../spark/streaming/util/RecurringTimer.scala | 13 +------ .../spark/streaming/StreamingContextSuite.scala | 10 +++++ .../org/apache/spark/streaming/TestSuiteBase.scala | 5 ++- 9 files changed, 96 insertions(+), 56 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index d59146e069..fa5f0e81dd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -354,17 +354,17 @@ abstract class DStream[T: ClassTag] ( * this method to save custom checkpoint data. */ private[streaming] def updateCheckpointData(currentTime: Time) { - logInfo("Updating checkpoint data for time " + currentTime) + logDebug("Updating checkpoint data for time " + currentTime) checkpointData.update(currentTime) dependencies.foreach(_.updateCheckpointData(currentTime)) logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData) } private[streaming] def clearCheckpointData(time: Time) { - logInfo("Clearing checkpoint data") + logDebug("Clearing checkpoint data") checkpointData.cleanup(time) dependencies.foreach(_.clearCheckpointData(time)) - logInfo("Cleared checkpoint data") + logDebug("Cleared checkpoint data") } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index caed1b3755..b5f11d3440 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -17,9 +17,8 @@ package org.apache.spark.streaming.scheduler -import akka.actor.{Props, Actor} -import org.apache.spark.SparkEnv -import org.apache.spark.Logging +import akka.actor.{ActorRef, ActorSystem, Props, Actor} +import org.apache.spark.{SparkException, SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter} import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock} import scala.util.{Failure, Success, Try} @@ -40,13 +39,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { private val ssc = jobScheduler.ssc private val graph = ssc.graph - private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { - def receive = { - case event: JobGeneratorEvent => - logDebug("Got event of type " + event.getClass.getName) - processEvent(event) - } - })) val clock = { val clockClass = ssc.sc.conf.get( "spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock") @@ -60,7 +52,23 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { null } + // eventActor is created when generator starts. + // This not being null means the scheduler has been started and not stopped + private var eventActor: ActorRef = null + + /** Start generation of jobs */ def start() = synchronized { + if (eventActor != null) { + throw new SparkException("JobGenerator already started") + } + + eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobGeneratorEvent => + logDebug("Got event of type " + event.getClass.getName) + processEvent(event) + } + }), "JobGenerator") if (ssc.isCheckpointPresent) { restart() } else { @@ -68,11 +76,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } } - def stop() { - timer.stop() - if (checkpointWriter != null) checkpointWriter.stop() - ssc.graph.stop() - logInfo("JobGenerator stopped") + /** Stop generation of jobs */ + def stop() = synchronized { + if (eventActor != null) { + timer.stop() + ssc.env.actorSystem.stop(eventActor) + if (checkpointWriter != null) checkpointWriter.stop() + ssc.graph.stop() + logInfo("JobGenerator stopped") + } } /** @@ -172,4 +184,3 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } } } - diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 24d57548c3..de675d3c7f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -41,20 +41,26 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val executor = Executors.newFixedThreadPool(numConcurrentJobs) private val jobGenerator = new JobGenerator(this) - private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { - def receive = { - case event: JobSchedulerEvent => processEvent(event) - } - })) - val clock = jobGenerator.clock // used by testsuites + val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() + // These two are created only when scheduler starts. + // eventActor not being null means the scheduler has been started and not stopped var networkInputTracker: NetworkInputTracker = null + private var eventActor: ActorRef = null + def start() = synchronized { - if (networkInputTracker != null) { - throw new SparkException("StreamingContext already started") + if (eventActor != null) { + throw new SparkException("JobScheduler already started") } + + eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { + def receive = { + case event: JobSchedulerEvent => processEvent(event) + } + }), "JobScheduler") + listenerBus.start() networkInputTracker = new NetworkInputTracker(ssc) networkInputTracker.start() Thread.sleep(1000) @@ -63,13 +69,15 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } def stop() = synchronized { - if (networkInputTracker != null) { + if (eventActor != null) { jobGenerator.stop() networkInputTracker.stop() executor.shutdown() if (!executor.awaitTermination(2, TimeUnit.SECONDS)) { executor.shutdownNow() } + listenerBus.stop() + ssc.env.actorSystem.stop(eventActor) logInfo("JobScheduler stopped") } } @@ -104,7 +112,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { case e: Throwable => reportError("Error in job scheduler", e) } - } private def handleJobStart(job: Job) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 34fb158205..0d9733fa69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -19,8 +19,7 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver} import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError} -import org.apache.spark.Logging -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkException, Logging, SparkEnv} import org.apache.spark.SparkContext._ import scala.collection.mutable.HashMap @@ -32,6 +31,7 @@ import akka.pattern.ask import akka.dispatch._ import org.apache.spark.storage.BlockId import org.apache.spark.streaming.{Time, StreamingContext} +import org.apache.spark.util.AkkaUtils private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage @@ -39,7 +39,9 @@ private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], m private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage /** - * This class manages the execution of the receivers of NetworkInputDStreams. + * This class manages the execution of the receivers of NetworkInputDStreams. Instance of + * this class must be created after all input streams have been added and StreamingContext.start() + * has been called because it needs the final set of input streams at the time of instantiation. */ private[streaming] class NetworkInputTracker(ssc: StreamingContext) extends Logging { @@ -49,23 +51,33 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] val receivedBlockIds = new HashMap[Int, Queue[BlockId]] - val timeout = 5000.milliseconds + val timeout = AkkaUtils.askTimeout(ssc.conf) + + // actor is created when generator starts. + // This not being null means the tracker has been started and not stopped + var actor: ActorRef = null var currentTime: Time = null /** Start the actor and receiver execution thread. */ def start() { + if (actor != null) { + throw new SparkException("NetworkInputTracker already started") + } + if (!networkInputStreams.isEmpty) { - ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") + actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") receiverExecutor.start() + logInfo("NetworkInputTracker started") } } /** Stop the receiver execution thread. */ def stop() { - if (!networkInputStreams.isEmpty) { + if (!networkInputStreams.isEmpty && actor != null) { receiverExecutor.interrupt() receiverExecutor.stopReceivers() + ssc.env.actorSystem.stop(actor) logInfo("NetworkInputTracker stopped") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 36225e190c..461ea35064 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -24,9 +24,10 @@ import org.apache.spark.util.Distribution sealed trait StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent - case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +/** An event used in the listener to shutdown the listener daemon thread. */ +private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent /** * A listener interface for receiving information about an ongoing streaming diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 110a20f282..6e6e22e1af 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -31,7 +31,7 @@ private[spark] class StreamingListenerBus() extends Logging { private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false - new Thread("StreamingListenerBus") { + val listenerThread = new Thread("StreamingListenerBus") { setDaemon(true) override def run() { while (true) { @@ -41,11 +41,18 @@ private[spark] class StreamingListenerBus() extends Logging { listeners.foreach(_.onBatchStarted(batchStarted)) case batchCompleted: StreamingListenerBatchCompleted => listeners.foreach(_.onBatchCompleted(batchCompleted)) + case StreamingListenerShutdown => + // Get out of the while loop and shutdown the daemon thread + return case _ => } } } - }.start() + } + + def start() { + listenerThread.start() + } def addListener(listener: StreamingListener) { listeners += listener @@ -54,9 +61,9 @@ private[spark] class StreamingListenerBus() extends Logging { def post(event: StreamingListenerEvent) { val eventAdded = eventQueue.offer(event) if (!eventAdded && !queueFullErrorMessageLogged) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with the " + - "rate at which tasks are being started by the scheduler.") + logError("Dropping StreamingListenerEvent because no remaining room in event queue. " + + "This likely means one of the StreamingListeners is too slow and cannot keep up with the " + + "rate at which events are being started by the scheduler.") queueFullErrorMessageLogged = true } } @@ -68,7 +75,7 @@ private[spark] class StreamingListenerBus() extends Logging { */ def waitUntilEmpty(timeoutMillis: Int): Boolean = { val finishTime = System.currentTimeMillis + timeoutMillis - while (!eventQueue.isEmpty()) { + while (!eventQueue.isEmpty) { if (System.currentTimeMillis > finishTime) { return false } @@ -78,4 +85,6 @@ private[spark] class StreamingListenerBus() extends Logging { } return true } + + def stop(): Unit = post(StreamingListenerShutdown) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala index d644240405..559c247385 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala @@ -20,17 +20,7 @@ package org.apache.spark.streaming.util private[streaming] class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) { - private val minPollTime = 25L - - private val pollTime = { - if (period / 10.0 > minPollTime) { - (period / 10.0).toLong - } else { - minPollTime - } - } - - private val thread = new Thread() { + private val thread = new Thread("RecurringTimer") { override def run() { loop } } @@ -66,7 +56,6 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => callback(nextTime) nextTime += period } - } catch { case e: InterruptedException => } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9eb9b3684c..10c18a7f7e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { ssc.stop() ssc = null } + if (sc != null) { + sc.stop() + sc = null + } } test("from no conf constructor") { @@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop multiple times") { ssc = new StreamingContext(master, appName, batchDuration) + addInputStream(ssc).register + ssc.start() ssc.stop() ssc.stop() ssc = null @@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts { test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) + sc = ssc.sparkContext + addInputStream(ssc).register + ssc.start() ssc.stop(false) ssc = null assert(sc.makeRDD(1 to 100).collect().size === 100) + ssc = new StreamingContext(sc, batchDuration) } test("waitForStop") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 3569624d51..a8ff444109 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) - Thread.sleep(10) + ssc.waitForStop(50) } val timeTaken = System.currentTimeMillis() - startTime - + logInfo("Output generated in " + timeTaken + " milliseconds") + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") -- cgit v1.2.3