aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayo@yahoo-inc.com>2013-08-10 07:50:27 -0700
committerKay Ousterhout <kayo@yahoo-inc.com>2013-08-10 07:50:27 -0700
commit14d14f451a35e647c4b9d13fd6a9f46d3091bae6 (patch)
treeddd880e981b3e9d8d2f56fcb78ba6e0d697411d5 /core
parent7810a76512a0fe07a81549e7d0cb01463bbefeac (diff)
downloadspark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.tar.gz
spark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.tar.bz2
spark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.zip
Shortened names, as per Matei's suggestion
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListenerBus.scala (renamed from core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala)6
2 files changed, 13 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9e2f3cc81f..fbf3f4c807 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -102,7 +102,7 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- private val listenerEventProcessor = new SparkListenerEventProcessor()
+ private val listenerBus = new SparkListenerBus()
var cacheLocs = new HashMap[Int, Array[List[String]]]
@@ -138,7 +138,7 @@ class DAGScheduler(
}
def addSparkListener(listener: SparkListener) {
- listenerEventProcessor.addListener(listener)
+ listenerBus.addListener(listener)
}
private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
@@ -338,7 +338,7 @@ class DAGScheduler(
// Compute very short actions like first() or take() with no parent stages locally.
runLocally(job)
} else {
- listenerEventProcessor.addEvent(SparkListenerJobStart(job, properties))
+ listenerBus.post(SparkListenerJobStart(job, properties))
idToActiveJob(runId) = job
activeJobs += job
resultStageToJob(finalStage) = job
@@ -352,10 +352,10 @@ class DAGScheduler(
handleExecutorLost(execId)
case begin: BeginEvent =>
- listenerEventProcessor.addEvent(SparkListenerTaskStart(begin.task, begin.taskInfo))
+ listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo))
case completion: CompletionEvent =>
- listenerEventProcessor.addEvent(SparkListenerTaskEnd(
+ listenerBus.post(SparkListenerTaskEnd(
completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))
handleTaskCompletion(completion)
@@ -367,7 +367,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, None)))
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None)))
}
return true
}
@@ -517,8 +517,7 @@ class DAGScheduler(
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
val properties = idToActiveJob(stage.priority).properties
- listenerEventProcessor.addEvent(
- SparkListenerStageSubmitted(stage, tasks.size, properties))
+ listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
@@ -564,7 +563,7 @@ class DAGScheduler(
}
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stage.completionTime = Some(System.currentTimeMillis)
- listenerEventProcessor.addEvent(StageCompleted(stageToInfos(stage)))
+ listenerBus.post(StageCompleted(stageToInfos(stage)))
running -= stage
}
event.reason match {
@@ -588,7 +587,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
- listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobSucceeded))
+ listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -735,7 +734,7 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
- listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
+ listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
idToActiveJob -= resultStage.priority
activeJobs -= job
resultStageToJob -= resultStage
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
index e5fade26b0..f55ed455ed 100644
--- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-class SparkListenerEventProcessor() extends Logging {
+private[spark] class SparkListenerBus() extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
@@ -33,7 +33,7 @@ class SparkListenerEventProcessor() extends Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
- new Thread("SparkListenerEventProcessor") {
+ new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
@@ -61,7 +61,7 @@ class SparkListenerEventProcessor() extends Logging {
sparkListeners += listener
}
- def addEvent(event: SparkListenerEvents) {
+ def post(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +