diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-19 20:48:36 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-19 20:48:36 -0700 |
commit | c776ee8a6fdcdc463746a815b7686e4e33a874a9 (patch) | |
tree | 0960f5dcd8e31f205e1b89f292caa5408c0b562c /streaming | |
parent | d8e1b7b06c499289ff3ce5ec91ff354493a17c48 (diff) | |
download | spark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.tar.gz spark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.tar.bz2 spark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.zip |
[SPARK-6979][Streaming] Replace JobScheduler.eventActor and JobGenerator.eventActor with EventLoop
Title says it all.
cc rxin tdas
Author: zsxwing <zsxwing@gmail.com>
Closes #5554 from zsxwing/SPARK-6979 and squashes the following commits:
5304350 [zsxwing] Fix NotSerializableException
e9d3479 [zsxwing] Add blank lines
633e279 [zsxwing] Fix NotSerializableException
e496ace [zsxwing] Replace JobGenerator.eventActor with EventLoop
ec6ec58 [zsxwing] Fix the import order
ce0fa73 [zsxwing] Replace JobScheduler.eventActor with EventLoop
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala | 38 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 40 |
2 files changed, 41 insertions, 37 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 58e56638a2..2467d50839 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -19,12 +19,10 @@ package org.apache.spark.streaming.scheduler import scala.util.{Failure, Success, Try} -import akka.actor.{ActorRef, Props, Actor} - import org.apache.spark.{SparkEnv, Logging} import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time} import org.apache.spark.streaming.util.RecurringTimer -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.{Clock, EventLoop, ManualClock} /** Event classes for JobGenerator */ private[scheduler] sealed trait JobGeneratorEvent @@ -58,7 +56,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { } private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, - longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator") + longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") // This is marked lazy so that this is initialized after checkpoint duration has been set // in the context and the generator has been started. @@ -70,22 +68,26 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { null } - // eventActor is created when generator starts. + // eventLoop is created when generator starts. // This not being null means the scheduler has been started and not stopped - private var eventActor: ActorRef = null + private var eventLoop: EventLoop[JobGeneratorEvent] = null // last batch whose completion,checkpointing and metadata cleanup has been completed private var lastProcessedBatch: Time = null /** Start generation of jobs */ def start(): Unit = synchronized { - if (eventActor != null) return // generator has already been started + if (eventLoop != null) return // generator has already been started + + eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { + override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) - eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { - override def receive: PartialFunction[Any, Unit] = { - case event: JobGeneratorEvent => processEvent(event) + override protected def onError(e: Throwable): Unit = { + jobScheduler.reportError("Error in job generator", e) } - }), "JobGenerator") + } + eventLoop.start() + if (ssc.isCheckpointPresent) { restart() } else { @@ -99,7 +101,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { * checkpoints written. */ def stop(processReceivedData: Boolean): Unit = synchronized { - if (eventActor == null) return // generator has already been stopped + if (eventLoop == null) return // generator has already been stopped if (processReceivedData) { logInfo("Stopping JobGenerator gracefully") @@ -146,9 +148,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { graph.stop() } - // Stop the actor and checkpoint writer + // Stop the event loop and checkpoint writer if (shouldCheckpoint) checkpointWriter.stop() - ssc.env.actorSystem.stop(eventActor) + eventLoop.stop() logInfo("Stopped JobGenerator") } @@ -156,7 +158,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { * Callback called when a batch has been completely processed. */ def onBatchCompletion(time: Time) { - eventActor ! ClearMetadata(time) + eventLoop.post(ClearMetadata(time)) } /** @@ -164,7 +166,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { */ def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) { if (clearCheckpointDataLater) { - eventActor ! ClearCheckpointData(time) + eventLoop.post(ClearCheckpointData(time)) } } @@ -247,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } - eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false) + eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) } /** Clear DStream metadata for the given `time`. */ @@ -257,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { // If checkpointing is enabled, then checkpoint, // else mark batch to be fully processed if (shouldCheckpoint) { - eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true) + eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true)) } else { // If checkpointing is not enabled, then delete metadata information about // received blocks (block data not saved in any case). Otherwise, wait for diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 95f1857b4c..508b89278d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,13 +17,15 @@ package org.apache.spark.streaming.scheduler -import scala.util.{Failure, Success, Try} -import scala.collection.JavaConversions._ import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} -import akka.actor.{ActorRef, Actor, Props} -import org.apache.spark.{SparkException, Logging, SparkEnv} + +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success} + +import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ +import org.apache.spark.util.EventLoop private[scheduler] sealed trait JobSchedulerEvent @@ -46,20 +48,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. - // eventActor not being null means the scheduler has been started and not stopped + // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null - private var eventActor: ActorRef = null - + private var eventLoop: EventLoop[JobSchedulerEvent] = null def start(): Unit = synchronized { - if (eventActor != null) return // scheduler has already been started + if (eventLoop != null) return // scheduler has already been started logDebug("Starting JobScheduler") - eventActor = ssc.env.actorSystem.actorOf(Props(new Actor { - override def receive: PartialFunction[Any, Unit] = { - case event: JobSchedulerEvent => processEvent(event) - } - }), "JobScheduler") + eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { + override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) + + override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) + } + eventLoop.start() listenerBus.start(ssc.sparkContext) receiverTracker = new ReceiverTracker(ssc) @@ -69,7 +71,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } def stop(processAllReceivedData: Boolean): Unit = synchronized { - if (eventActor == null) return // scheduler has already been stopped + if (eventLoop == null) return // scheduler has already been stopped logDebug("Stopping JobScheduler") // First, stop receiving @@ -96,8 +98,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // Stop everything else listenerBus.stop() - ssc.env.actorSystem.stop(eventActor) - eventActor = null + eventLoop.stop() + eventLoop = null logInfo("Stopped JobScheduler") } @@ -117,7 +119,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } def reportError(msg: String, e: Throwable) { - eventActor ! ErrorReported(msg, e) + eventLoop.post(ErrorReported(msg, e)) } private def processEvent(event: JobSchedulerEvent) { @@ -172,14 +174,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private class JobHandler(job: Job) extends Runnable { def run() { - eventActor ! JobStarted(job) + eventLoop.post(JobStarted(job)) // Disable checks for existing output directories in jobs launched by the streaming scheduler, // since we may need to write output to an existing directory during checkpoint recovery; // see SPARK-4835 for more details. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } - eventActor ! JobCompleted(job) + eventLoop.post(JobCompleted(job)) } } } |