aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-12 16:44:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-12 16:44:07 -0800
commit7883b8f5798e3de6f55a1182a5d5775c4aaa783b (patch)
tree8ca20c15e797c9b81778da986ab022dc9af2d30c /streaming
parentc5921e5c6184ddc99c12c0b1f2646b6bd74a9e98 (diff)
downloadspark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.gz
spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.tar.bz2
spark-7883b8f5798e3de6f55a1182a5d5775c4aaa783b.zip
Fixed bugs to ensure better cleanup of JobScheduler, JobGenerator and NetworkInputTracker upon close.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala43
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala27
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala21
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala5
9 files changed, 96 insertions, 56 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
index d59146e069..fa5f0e81dd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala
@@ -354,17 +354,17 @@ abstract class DStream[T: ClassTag] (
* this method to save custom checkpoint data.
*/
private[streaming] def updateCheckpointData(currentTime: Time) {
- logInfo("Updating checkpoint data for time " + currentTime)
+ logDebug("Updating checkpoint data for time " + currentTime)
checkpointData.update(currentTime)
dependencies.foreach(_.updateCheckpointData(currentTime))
logDebug("Updated checkpoint data for time " + currentTime + ": " + checkpointData)
}
private[streaming] def clearCheckpointData(time: Time) {
- logInfo("Clearing checkpoint data")
+ logDebug("Clearing checkpoint data")
checkpointData.cleanup(time)
dependencies.foreach(_.clearCheckpointData(time))
- logInfo("Cleared checkpoint data")
+ logDebug("Cleared checkpoint data")
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index caed1b3755..b5f11d3440 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -17,9 +17,8 @@
package org.apache.spark.streaming.scheduler
-import akka.actor.{Props, Actor}
-import org.apache.spark.SparkEnv
-import org.apache.spark.Logging
+import akka.actor.{ActorRef, ActorSystem, Props, Actor}
+import org.apache.spark.{SparkException, SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, Time, CheckpointWriter}
import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
import scala.util.{Failure, Success, Try}
@@ -40,13 +39,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private val ssc = jobScheduler.ssc
private val graph = ssc.graph
- private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
- case event: JobGeneratorEvent =>
- logDebug("Got event of type " + event.getClass.getName)
- processEvent(event)
- }
- }))
val clock = {
val clockClass = ssc.sc.conf.get(
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
@@ -60,7 +52,23 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
null
}
+ // eventActor is created when generator starts.
+ // This not being null means the scheduler has been started and not stopped
+ private var eventActor: ActorRef = null
+
+ /** Start generation of jobs */
def start() = synchronized {
+ if (eventActor != null) {
+ throw new SparkException("JobGenerator already started")
+ }
+
+ eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case event: JobGeneratorEvent =>
+ logDebug("Got event of type " + event.getClass.getName)
+ processEvent(event)
+ }
+ }), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
@@ -68,11 +76,15 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
}
- def stop() {
- timer.stop()
- if (checkpointWriter != null) checkpointWriter.stop()
- ssc.graph.stop()
- logInfo("JobGenerator stopped")
+ /** Stop generation of jobs */
+ def stop() = synchronized {
+ if (eventActor != null) {
+ timer.stop()
+ ssc.env.actorSystem.stop(eventActor)
+ if (checkpointWriter != null) checkpointWriter.stop()
+ ssc.graph.stop()
+ logInfo("JobGenerator stopped")
+ }
}
/**
@@ -172,4 +184,3 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
}
}
-
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 24d57548c3..de675d3c7f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -41,20 +41,26 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
private val executor = Executors.newFixedThreadPool(numConcurrentJobs)
private val jobGenerator = new JobGenerator(this)
- private val eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- def receive = {
- case event: JobSchedulerEvent => processEvent(event)
- }
- }))
- val clock = jobGenerator.clock // used by testsuites
+ val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
+ // These two are created only when scheduler starts.
+ // eventActor not being null means the scheduler has been started and not stopped
var networkInputTracker: NetworkInputTracker = null
+ private var eventActor: ActorRef = null
+
def start() = synchronized {
- if (networkInputTracker != null) {
- throw new SparkException("StreamingContext already started")
+ if (eventActor != null) {
+ throw new SparkException("JobScheduler already started")
}
+
+ eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
+ def receive = {
+ case event: JobSchedulerEvent => processEvent(event)
+ }
+ }), "JobScheduler")
+ listenerBus.start()
networkInputTracker = new NetworkInputTracker(ssc)
networkInputTracker.start()
Thread.sleep(1000)
@@ -63,13 +69,15 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
def stop() = synchronized {
- if (networkInputTracker != null) {
+ if (eventActor != null) {
jobGenerator.stop()
networkInputTracker.stop()
executor.shutdown()
if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
+ listenerBus.stop()
+ ssc.env.actorSystem.stop(eventActor)
logInfo("JobScheduler stopped")
}
}
@@ -104,7 +112,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
case e: Throwable =>
reportError("Error in job scheduler", e)
}
-
}
private def handleJobStart(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
index 34fb158205..0d9733fa69 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
@@ -19,8 +19,7 @@ package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
-import org.apache.spark.Logging
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.SparkContext._
import scala.collection.mutable.HashMap
@@ -32,6 +31,7 @@ import akka.pattern.ask
import akka.dispatch._
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{Time, StreamingContext}
+import org.apache.spark.util.AkkaUtils
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
@@ -39,7 +39,9 @@ private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], m
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
/**
- * This class manages the execution of the receivers of NetworkInputDStreams.
+ * This class manages the execution of the receivers of NetworkInputDStreams. Instance of
+ * this class must be created after all input streams have been added and StreamingContext.start()
+ * has been called because it needs the final set of input streams at the time of instantiation.
*/
private[streaming]
class NetworkInputTracker(ssc: StreamingContext) extends Logging {
@@ -49,23 +51,33 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef]
val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
- val timeout = 5000.milliseconds
+ val timeout = AkkaUtils.askTimeout(ssc.conf)
+
+ // actor is created when generator starts.
+ // This not being null means the tracker has been started and not stopped
+ var actor: ActorRef = null
var currentTime: Time = null
/** Start the actor and receiver execution thread. */
def start() {
+ if (actor != null) {
+ throw new SparkException("NetworkInputTracker already started")
+ }
+
if (!networkInputStreams.isEmpty) {
- ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
+ actor = ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
receiverExecutor.start()
+ logInfo("NetworkInputTracker started")
}
}
/** Stop the receiver execution thread. */
def stop() {
- if (!networkInputStreams.isEmpty) {
+ if (!networkInputStreams.isEmpty && actor != null) {
receiverExecutor.interrupt()
receiverExecutor.stopReceivers()
+ ssc.env.actorSystem.stop(actor)
logInfo("NetworkInputTracker stopped")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 36225e190c..461ea35064 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -24,9 +24,10 @@ import org.apache.spark.util.Distribution
sealed trait StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
-
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
+/** An event used in the listener to shutdown the listener daemon thread. */
+private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
/**
* A listener interface for receiving information about an ongoing streaming
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 110a20f282..6e6e22e1af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -31,7 +31,7 @@ private[spark] class StreamingListenerBus() extends Logging {
private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
- new Thread("StreamingListenerBus") {
+ val listenerThread = new Thread("StreamingListenerBus") {
setDaemon(true)
override def run() {
while (true) {
@@ -41,11 +41,18 @@ private[spark] class StreamingListenerBus() extends Logging {
listeners.foreach(_.onBatchStarted(batchStarted))
case batchCompleted: StreamingListenerBatchCompleted =>
listeners.foreach(_.onBatchCompleted(batchCompleted))
+ case StreamingListenerShutdown =>
+ // Get out of the while loop and shutdown the daemon thread
+ return
case _ =>
}
}
}
- }.start()
+ }
+
+ def start() {
+ listenerThread.start()
+ }
def addListener(listener: StreamingListener) {
listeners += listener
@@ -54,9 +61,9 @@ private[spark] class StreamingListenerBus() extends Logging {
def post(event: StreamingListenerEvent) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
- logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
- "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
- "rate at which tasks are being started by the scheduler.")
+ logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
+ "rate at which events are being started by the scheduler.")
queueFullErrorMessageLogged = true
}
}
@@ -68,7 +75,7 @@ private[spark] class StreamingListenerBus() extends Logging {
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty()) {
+ while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@@ -78,4 +85,6 @@ private[spark] class StreamingListenerBus() extends Logging {
}
return true
}
+
+ def stop(): Unit = post(StreamingListenerShutdown)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
index d644240405..559c247385 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RecurringTimer.scala
@@ -20,17 +20,7 @@ package org.apache.spark.streaming.util
private[streaming]
class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) => Unit) {
- private val minPollTime = 25L
-
- private val pollTime = {
- if (period / 10.0 > minPollTime) {
- (period / 10.0).toLong
- } else {
- minPollTime
- }
- }
-
- private val thread = new Thread() {
+ private val thread = new Thread("RecurringTimer") {
override def run() { loop }
}
@@ -66,7 +56,6 @@ class RecurringTimer(val clock: Clock, val period: Long, val callback: (Long) =>
callback(nextTime)
nextTime += period
}
-
} catch {
case e: InterruptedException =>
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9eb9b3684c..10c18a7f7e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -45,6 +45,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
ssc.stop()
ssc = null
}
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
}
test("from no conf constructor") {
@@ -124,6 +128,8 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop()
ssc.stop()
ssc = null
@@ -131,9 +137,13 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts {
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
+ sc = ssc.sparkContext
+ addInputStream(ssc).register
+ ssc.start()
ssc.stop(false)
ssc = null
assert(sc.makeRDD(1 to 100).collect().size === 100)
+ ssc = new StreamingContext(sc, batchDuration)
}
test("waitForStop") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 3569624d51..a8ff444109 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -273,10 +273,11 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
val startTime = System.currentTimeMillis()
while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
- Thread.sleep(10)
+ ssc.waitForStop(50)
}
val timeTaken = System.currentTimeMillis() - startTime
-
+ logInfo("Output generated in " + timeTaken + " milliseconds")
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")