diff options
author | Mingfei <mingfei.shi@intel.com> | 2013-06-13 14:36:07 +0800 |
---|---|---|
committer | Mingfei <mingfei.shi@intel.com> | 2013-06-13 14:36:07 +0800 |
commit | 967a6a699da7da007f51e59d085a357da5ec14da (patch) | |
tree | 3979c761746d7b60130495f374ebddbcb7c020e9 /core/src | |
parent | ade822011d44bd43e9ac78c1d29ec924a1f6e8e7 (diff) | |
download | spark-967a6a699da7da007f51e59d085a357da5ec14da.tar.gz spark-967a6a699da7da007f51e59d085a357da5ec14da.tar.bz2 spark-967a6a699da7da007f51e59d085a357da5ec14da.zip |
modify sparklister function interface according to comments
Diffstat (limited to 'core/src')
4 files changed, 79 insertions, 73 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 43dd7d6534..e281e5a8db 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -289,7 +289,7 @@ class DAGScheduler( val finalStage = newStage(finalRDD, None, runId) val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() - sparkListeners.foreach(_.onJobStart(job, properties)) + sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties))) logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length + " output partitions (allowLocal=" + allowLocal + ")") logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")") @@ -312,7 +312,7 @@ class DAGScheduler( handleExecutorLost(execId) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(completion)) + sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion))) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -323,8 +323,8 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN") - sparkListeners.foreach(_.onJobEnd(job, JobCancelEvent)) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobCancelled(job, + "SPARKCONTEXT_SHUTDOWN"))) } return true } @@ -472,7 +472,7 @@ class DAGScheduler( } } if (tasks.size > 0) { - sparkListeners.foreach(_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size)) + sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size))) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) @@ -527,7 +527,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach(_.onJobEnd(job, SparkListenerJobSuccess)) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobSuccess(job))) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -671,8 +671,7 @@ class DAGScheduler( job.listener.jobFailed(new SparkException("Job failed: " + reason)) activeJobs -= job resultStageToJob -= resultStage - val jobFailedEvent = new SparkListenerJobFailed(failedStage) - sparkListeners.foreach(_.onJobEnd(job, jobFailedEvent)) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobFailed(job, failedStage))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 46b9fa974b..002c5826cb 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -15,13 +15,6 @@ import spark.scheduler.cluster.TaskInfo // used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
-sealed trait JobLoggerEvent
-case class JobLoggerOnJobStart(job: ActiveJob, properties: Properties) extends JobLoggerEvent
-case class JobLoggerOnStageSubmitted(stage: Stage, info: String) extends JobLoggerEvent
-case class JobLoggerOnStageCompleted(stageCompleted: StageCompleted) extends JobLoggerEvent
-case class JobLoggerOnJobEnd(job: ActiveJob, event: SparkListenerEvents) extends JobLoggerEvent
-case class JobLoggerOnTaskEnd(event: CompletionEvent) extends JobLoggerEvent
-
class JobLogger(val logDirName: String) extends SparkListener with Logging {
private val logDir =
if (System.getenv("SPARK_LOG_DIR") != null)
@@ -32,7 +25,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
- private val eventQueue = new LinkedBlockingQueue[JobLoggerEvent]
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
createLogDir()
def this() = this(String.valueOf(System.currentTimeMillis()))
@@ -50,15 +43,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { val event = eventQueue.take
logDebug("Got event of type " + event.getClass.getName)
event match {
- case JobLoggerOnJobStart(job, info) =>
- processJobStartEvent(job, info)
- case JobLoggerOnStageSubmitted(stage, info) =>
- processStageSubmittedEvent(stage, info)
- case JobLoggerOnStageCompleted(stageCompleted) =>
- processStageCompletedEvent(stageCompleted)
- case JobLoggerOnJobEnd(job, event) =>
- processJobEndEvent(job, event)
- case JobLoggerOnTaskEnd(event) =>
+ case SparkListenerJobStart(job, properties) =>
+ processJobStartEvent(job, properties)
+ case SparkListenerStageSubmitted(stage, taskSize) =>
+ processStageSubmittedEvent(stage, taskSize)
+ case StageCompleted(stageInfo) =>
+ processStageCompletedEvent(stageInfo)
+ case SparkListenerJobSuccess(job) =>
+ processJobEndEvent(job)
+ case SparkListenerJobFailed(job, failedStage) =>
+ processJobEndEvent(job, failedStage)
+ case SparkListenerJobCancelled(job, reason) =>
+ processJobEndEvent(job, reason)
+ case SparkListenerTaskEnd(event) =>
processTaskEndEvent(event)
case _ =>
}
@@ -225,26 +222,26 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}
- override def onStageSubmitted(stage: Stage, info: String = "") {
- eventQueue.put(JobLoggerOnStageSubmitted(stage, info))
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
+ eventQueue.put(stageSubmitted)
}
- protected def processStageSubmittedEvent(stage: Stage, info: String) {
- stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info)
+ protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) {
+ stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize)
}
override def onStageCompleted(stageCompleted: StageCompleted) {
- eventQueue.put(JobLoggerOnStageCompleted(stageCompleted))
+ eventQueue.put(stageCompleted)
}
- protected def processStageCompletedEvent(stageCompleted: StageCompleted) {
- stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID=" +
- stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED")
+ protected def processStageCompletedEvent(stageInfo: StageInfo) {
+ stageLogInfo(stageInfo.stage.id, "STAGE_ID=" +
+ stageInfo.stage.id + " STATUS=COMPLETED")
}
- override def onTaskEnd(event: CompletionEvent) {
- eventQueue.put(JobLoggerOnTaskEnd(event))
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ eventQueue.put(taskEnd)
}
protected def processTaskEndEvent(event: CompletionEvent) {
@@ -273,24 +270,26 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
}
- override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) {
- eventQueue.put(JobLoggerOnJobEnd(job, event))
+ override def onJobEnd(jobEnd: SparkListenerEvents) {
+ eventQueue.put(jobEnd)
}
- protected def processJobEndEvent(job: ActiveJob, event: SparkListenerEvents) {
- var info = "JOB_ID=" + job.runId + " STATUS="
- var validEvent = true
- event match {
- case SparkListenerJobSuccess => info += "SUCCESS"
- case SparkListenerJobFailed(failedStage) =>
- info += "FAILED REASON=STAGE_FAILED FAILED_STAGE_ID=" + failedStage.id
- case SparkListenerJobCancelled(reason) => info += "CANCELLED REASON=" + reason
- case _ => validEvent = false
- }
- if (validEvent) {
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
+ protected def processJobEndEvent(job: ActiveJob) {
+ val info = "JOB_ID=" + job.runId + " STATUS=SUCCESS"
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
+ }
+
+ protected def processJobEndEvent(job: ActiveJob, failedStage: Stage) {
+ val info = "JOB_ID=" + job.runId + " STATUS=FAILED REASON=STAGE_FAILED FAILED_STAGE_ID="
+ + failedStage.id
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
+ }
+ protected def processJobEndEvent(job: ActiveJob, reason: String) {
+ var info = "JOB_ID=" + job.runId + " STATUS=CANCELLED REASON=" + reason
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
}
protected def recordJobProperties(jobID: Int, properties: Properties) {
@@ -300,8 +299,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
}
- override def onJobStart(job: ActiveJob, properties: Properties = null) {
- eventQueue.put(JobLoggerOnJobStart(job, properties))
+ override def onJobStart(jobStart: SparkListenerJobStart) {
+ eventQueue.put(jobStart)
}
protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 9cf7f3ffc0..9265261dc1 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -6,6 +6,24 @@ import spark.util.Distribution import spark.{Utils, Logging, SparkContext, TaskEndReason} import spark.executor.TaskMetrics + +sealed trait SparkListenerEvents + +case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents + +case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents + +case class SparkListenerTaskEnd(event: CompletionEvent) extends SparkListenerEvents + +case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) + extends SparkListenerEvents + +case class SparkListenerJobSuccess(job: ActiveJob) extends SparkListenerEvents + +case class SparkListenerJobFailed(job: ActiveJob, failedStage: Stage) extends SparkListenerEvents + +case class SparkListenerJobCancelled(job: ActiveJob, reason: String) extends SparkListenerEvents + trait SparkListener { /** * called when a stage is completed, with information on the completed stage @@ -15,35 +33,25 @@ trait SparkListener { /** * called when a stage is submitted */ - def onStageSubmitted(stage: Stage, info: String = "") { } - + def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { } + /** * called when a task ends */ - def onTaskEnd(event: CompletionEvent) { } + def onTaskEnd(taskEnd: SparkListenerTaskEnd) { } /** * called when a job starts */ - def onJobStart(job: ActiveJob, properties: Properties = null) { } + def onJobStart(jobStart: SparkListenerJobStart) { } /** * called when a job ends */ - def onJobEnd(job: ActiveJob, event: SparkListenerEvents) { } + def onJobEnd(jobEnd: SparkListenerEvents) { } } -sealed trait SparkListenerEvents - -case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents - -case object SparkListenerJobSuccess extends SparkListenerEvents - -case class SparkListenerJobFailed(failedStage: Stage) extends SparkListenerEvents - -case class SparkListenerJobCancelled(reason: String) extends SparkListenerEvents - /** * Simple SparkListener that logs a few summary statistics when each stage completes */ diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala index 34fd8b995e..a654bf3ffd 100644 --- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -40,7 +40,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID) - joblogger.onStageSubmitted(rootStage) + joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4)) joblogger.getEventQueue.size should be (1) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) parentRdd.setName("MyRDD") @@ -86,11 +86,11 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers var onJobStartCount = 0 var onStageCompletedCount = 0 var onStageSubmittedCount = 0 - override def onTaskEnd(event: CompletionEvent) = onTaskEndCount += 1 - override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) = onJobEndCount += 1 - override def onJobStart(job: ActiveJob, properties: Properties) = onJobStartCount += 1 + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1 + override def onJobEnd(jobEnd: SparkListenerEvents) = onJobEndCount += 1 + override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1 override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 - override def onStageSubmitted(stage: Stage, info: String = "") = onStageSubmittedCount += 1 + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1 } sc.addSparkListener(joblogger) val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } |