aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-19 20:48:36 -0700
committerReynold Xin <rxin@databricks.com>2015-04-19 20:48:36 -0700
commitc776ee8a6fdcdc463746a815b7686e4e33a874a9 (patch)
tree0960f5dcd8e31f205e1b89f292caa5408c0b562c /streaming
parentd8e1b7b06c499289ff3ce5ec91ff354493a17c48 (diff)
downloadspark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.tar.gz
spark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.tar.bz2
spark-c776ee8a6fdcdc463746a815b7686e4e33a874a9.zip
[SPARK-6979][Streaming] Replace JobScheduler.eventActor and JobGenerator.eventActor with EventLoop
Title says it all. cc rxin tdas Author: zsxwing <zsxwing@gmail.com> Closes #5554 from zsxwing/SPARK-6979 and squashes the following commits: 5304350 [zsxwing] Fix NotSerializableException e9d3479 [zsxwing] Add blank lines 633e279 [zsxwing] Fix NotSerializableException e496ace [zsxwing] Replace JobGenerator.eventActor with EventLoop ec6ec58 [zsxwing] Fix the import order ce0fa73 [zsxwing] Replace JobScheduler.eventActor with EventLoop
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala38
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala40
2 files changed, 41 insertions, 37 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 58e56638a2..2467d50839 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -19,12 +19,10 @@ package org.apache.spark.streaming.scheduler
import scala.util.{Failure, Success, Try}
-import akka.actor.{ActorRef, Props, Actor}
-
import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
import org.apache.spark.streaming.util.RecurringTimer
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.{Clock, EventLoop, ManualClock}
/** Event classes for JobGenerator */
private[scheduler] sealed trait JobGeneratorEvent
@@ -58,7 +56,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
- longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
+ longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
// This is marked lazy so that this is initialized after checkpoint duration has been set
// in the context and the generator has been started.
@@ -70,22 +68,26 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
null
}
- // eventActor is created when generator starts.
+ // eventLoop is created when generator starts.
// This not being null means the scheduler has been started and not stopped
- private var eventActor: ActorRef = null
+ private var eventLoop: EventLoop[JobGeneratorEvent] = null
// last batch whose completion,checkpointing and metadata cleanup has been completed
private var lastProcessedBatch: Time = null
/** Start generation of jobs */
def start(): Unit = synchronized {
- if (eventActor != null) return // generator has already been started
+ if (eventLoop != null) return // generator has already been started
+
+ eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
+ override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
- eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- override def receive: PartialFunction[Any, Unit] = {
- case event: JobGeneratorEvent => processEvent(event)
+ override protected def onError(e: Throwable): Unit = {
+ jobScheduler.reportError("Error in job generator", e)
}
- }), "JobGenerator")
+ }
+ eventLoop.start()
+
if (ssc.isCheckpointPresent) {
restart()
} else {
@@ -99,7 +101,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
* checkpoints written.
*/
def stop(processReceivedData: Boolean): Unit = synchronized {
- if (eventActor == null) return // generator has already been stopped
+ if (eventLoop == null) return // generator has already been stopped
if (processReceivedData) {
logInfo("Stopping JobGenerator gracefully")
@@ -146,9 +148,9 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
graph.stop()
}
- // Stop the actor and checkpoint writer
+ // Stop the event loop and checkpoint writer
if (shouldCheckpoint) checkpointWriter.stop()
- ssc.env.actorSystem.stop(eventActor)
+ eventLoop.stop()
logInfo("Stopped JobGenerator")
}
@@ -156,7 +158,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
* Callback called when a batch has been completely processed.
*/
def onBatchCompletion(time: Time) {
- eventActor ! ClearMetadata(time)
+ eventLoop.post(ClearMetadata(time))
}
/**
@@ -164,7 +166,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
*/
def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
if (clearCheckpointDataLater) {
- eventActor ! ClearCheckpointData(time)
+ eventLoop.post(ClearCheckpointData(time))
}
}
@@ -247,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
- eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
+ eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
/** Clear DStream metadata for the given `time`. */
@@ -257,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
- eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
+ eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 95f1857b4c..508b89278d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,13 +17,15 @@
package org.apache.spark.streaming.scheduler
-import scala.util.{Failure, Success, Try}
-import scala.collection.JavaConversions._
import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
-import akka.actor.{ActorRef, Actor, Props}
-import org.apache.spark.{SparkException, Logging, SparkEnv}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success}
+
+import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
+import org.apache.spark.util.EventLoop
private[scheduler] sealed trait JobSchedulerEvent
@@ -46,20 +48,20 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
val listenerBus = new StreamingListenerBus()
// These two are created only when scheduler starts.
- // eventActor not being null means the scheduler has been started and not stopped
+ // eventLoop not being null means the scheduler has been started and not stopped
var receiverTracker: ReceiverTracker = null
- private var eventActor: ActorRef = null
-
+ private var eventLoop: EventLoop[JobSchedulerEvent] = null
def start(): Unit = synchronized {
- if (eventActor != null) return // scheduler has already been started
+ if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
- eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
- override def receive: PartialFunction[Any, Unit] = {
- case event: JobSchedulerEvent => processEvent(event)
- }
- }), "JobScheduler")
+ eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
+ override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
+
+ override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
+ }
+ eventLoop.start()
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
@@ -69,7 +71,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
def stop(processAllReceivedData: Boolean): Unit = synchronized {
- if (eventActor == null) return // scheduler has already been stopped
+ if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")
// First, stop receiving
@@ -96,8 +98,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// Stop everything else
listenerBus.stop()
- ssc.env.actorSystem.stop(eventActor)
- eventActor = null
+ eventLoop.stop()
+ eventLoop = null
logInfo("Stopped JobScheduler")
}
@@ -117,7 +119,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
def reportError(msg: String, e: Throwable) {
- eventActor ! ErrorReported(msg, e)
+ eventLoop.post(ErrorReported(msg, e))
}
private def processEvent(event: JobSchedulerEvent) {
@@ -172,14 +174,14 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private class JobHandler(job: Job) extends Runnable {
def run() {
- eventActor ! JobStarted(job)
+ eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
- eventActor ! JobCompleted(job)
+ eventLoop.post(JobCompleted(job))
}
}
}