diff options
author | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-10 07:50:27 -0700 |
---|---|---|
committer | Kay Ousterhout <kayo@yahoo-inc.com> | 2013-08-10 07:50:27 -0700 |
commit | 14d14f451a35e647c4b9d13fd6a9f46d3091bae6 (patch) | |
tree | ddd880e981b3e9d8d2f56fcb78ba6e0d697411d5 /core | |
parent | 7810a76512a0fe07a81549e7d0cb01463bbefeac (diff) | |
download | spark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.tar.gz spark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.tar.bz2 spark-14d14f451a35e647c4b9d13fd6a9f46d3091bae6.zip |
Shortened names, as per Matei's suggestion
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 21 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/SparkListenerBus.scala (renamed from core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala) | 6 |
2 files changed, 13 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9e2f3cc81f..fbf3f4c807 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -102,7 +102,7 @@ class DAGScheduler( private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo] - private val listenerEventProcessor = new SparkListenerEventProcessor() + private val listenerBus = new SparkListenerBus() var cacheLocs = new HashMap[Int, Array[List[String]]] @@ -138,7 +138,7 @@ class DAGScheduler( } def addSparkListener(listener: SparkListener) { - listenerEventProcessor.addListener(listener) + listenerBus.addListener(listener) } private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = { @@ -338,7 +338,7 @@ class DAGScheduler( // Compute very short actions like first() or take() with no parent stages locally. runLocally(job) } else { - listenerEventProcessor.addEvent(SparkListenerJobStart(job, properties)) + listenerBus.post(SparkListenerJobStart(job, properties)) idToActiveJob(runId) = job activeJobs += job resultStageToJob(finalStage) = job @@ -352,10 +352,10 @@ class DAGScheduler( handleExecutorLost(execId) case begin: BeginEvent => - listenerEventProcessor.addEvent(SparkListenerTaskStart(begin.task, begin.taskInfo)) + listenerBus.post(SparkListenerTaskStart(begin.task, begin.taskInfo)) case completion: CompletionEvent => - listenerEventProcessor.addEvent(SparkListenerTaskEnd( + listenerBus.post(SparkListenerTaskEnd( completion.task, completion.reason, completion.taskInfo, completion.taskMetrics)) handleTaskCompletion(completion) @@ -367,7 +367,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, None))) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None))) } return true } @@ -517,8 +517,7 @@ class DAGScheduler( // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" val properties = idToActiveJob(stage.priority).properties - listenerEventProcessor.addEvent( - SparkListenerStageSubmitted(stage, tasks.size, properties)) + listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this @@ -564,7 +563,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stage.completionTime = Some(System.currentTimeMillis) - listenerEventProcessor.addEvent(StageCompleted(stageToInfos(stage))) + listenerBus.post(StageCompleted(stageToInfos(stage))) running -= stage } event.reason match { @@ -588,7 +587,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobSucceeded)) + listenerBus.post(SparkListenerJobEnd(job, JobSucceeded)) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -735,7 +734,7 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - listenerEventProcessor.addEvent(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage)))) idToActiveJob -= resultStage.priority activeJobs -= job resultStageToJob -= resultStage diff --git a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala index e5fade26b0..f55ed455ed 100644 --- a/core/src/main/scala/spark/scheduler/SparkListenerEventProcessor.scala +++ b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-class SparkListenerEventProcessor() extends Logging {
+private[spark] class SparkListenerBus() extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
@@ -33,7 +33,7 @@ class SparkListenerEventProcessor() extends Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
- new Thread("SparkListenerEventProcessor") {
+ new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
@@ -61,7 +61,7 @@ class SparkListenerEventProcessor() extends Logging { sparkListeners += listener
}
- def addEvent(event: SparkListenerEvents) {
+ def post(event: SparkListenerEvents) {
val eventAdded = eventQueue.offer(event)
if (!eventAdded && !queueFullErrorMessageLogged) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
|