aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMingfei <mingfei.shi@intel.com>2013-06-13 14:36:07 +0800
committerMingfei <mingfei.shi@intel.com>2013-06-13 14:36:07 +0800
commit967a6a699da7da007f51e59d085a357da5ec14da (patch)
tree3979c761746d7b60130495f374ebddbcb7c020e9 /core/src
parentade822011d44bd43e9ac78c1d29ec924a1f6e8e7 (diff)
downloadspark-967a6a699da7da007f51e59d085a357da5ec14da.tar.gz
spark-967a6a699da7da007f51e59d085a357da5ec14da.tar.bz2
spark-967a6a699da7da007f51e59d085a357da5ec14da.zip
modify sparklister function interface according to comments
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala89
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala38
-rw-r--r--core/src/test/scala/spark/scheduler/JobLoggerSuite.scala10
4 files changed, 79 insertions, 73 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 43dd7d6534..e281e5a8db 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -289,7 +289,7 @@ class DAGScheduler(
val finalStage = newStage(finalRDD, None, runId)
val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
- sparkListeners.foreach(_.onJobStart(job, properties))
+ sparkListeners.foreach(_.onJobStart(SparkListenerJobStart(job, properties)))
logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
@@ -312,7 +312,7 @@ class DAGScheduler(
handleExecutorLost(execId)
case completion: CompletionEvent =>
- sparkListeners.foreach(_.onTaskEnd(completion))
+ sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion)))
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
@@ -323,8 +323,8 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- val JobCancelEvent = new SparkListenerJobCancelled("SPARKCONTEXT_SHUTDOWN")
- sparkListeners.foreach(_.onJobEnd(job, JobCancelEvent))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobCancelled(job,
+ "SPARKCONTEXT_SHUTDOWN")))
}
return true
}
@@ -472,7 +472,7 @@ class DAGScheduler(
}
}
if (tasks.size > 0) {
- sparkListeners.foreach(_.onStageSubmitted(stage, "TASKS_SIZE=" + tasks.size))
+ sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
@@ -527,7 +527,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
- sparkListeners.foreach(_.onJobEnd(job, SparkListenerJobSuccess))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobSuccess(job)))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -671,8 +671,7 @@ class DAGScheduler(
job.listener.jobFailed(new SparkException("Job failed: " + reason))
activeJobs -= job
resultStageToJob -= resultStage
- val jobFailedEvent = new SparkListenerJobFailed(failedStage)
- sparkListeners.foreach(_.onJobEnd(job, jobFailedEvent))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobFailed(job, failedStage)))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 46b9fa974b..002c5826cb 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -15,13 +15,6 @@ import spark.scheduler.cluster.TaskInfo
// used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
-sealed trait JobLoggerEvent
-case class JobLoggerOnJobStart(job: ActiveJob, properties: Properties) extends JobLoggerEvent
-case class JobLoggerOnStageSubmitted(stage: Stage, info: String) extends JobLoggerEvent
-case class JobLoggerOnStageCompleted(stageCompleted: StageCompleted) extends JobLoggerEvent
-case class JobLoggerOnJobEnd(job: ActiveJob, event: SparkListenerEvents) extends JobLoggerEvent
-case class JobLoggerOnTaskEnd(event: CompletionEvent) extends JobLoggerEvent
-
class JobLogger(val logDirName: String) extends SparkListener with Logging {
private val logDir =
if (System.getenv("SPARK_LOG_DIR") != null)
@@ -32,7 +25,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
- private val eventQueue = new LinkedBlockingQueue[JobLoggerEvent]
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
createLogDir()
def this() = this(String.valueOf(System.currentTimeMillis()))
@@ -50,15 +43,19 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
val event = eventQueue.take
logDebug("Got event of type " + event.getClass.getName)
event match {
- case JobLoggerOnJobStart(job, info) =>
- processJobStartEvent(job, info)
- case JobLoggerOnStageSubmitted(stage, info) =>
- processStageSubmittedEvent(stage, info)
- case JobLoggerOnStageCompleted(stageCompleted) =>
- processStageCompletedEvent(stageCompleted)
- case JobLoggerOnJobEnd(job, event) =>
- processJobEndEvent(job, event)
- case JobLoggerOnTaskEnd(event) =>
+ case SparkListenerJobStart(job, properties) =>
+ processJobStartEvent(job, properties)
+ case SparkListenerStageSubmitted(stage, taskSize) =>
+ processStageSubmittedEvent(stage, taskSize)
+ case StageCompleted(stageInfo) =>
+ processStageCompletedEvent(stageInfo)
+ case SparkListenerJobSuccess(job) =>
+ processJobEndEvent(job)
+ case SparkListenerJobFailed(job, failedStage) =>
+ processJobEndEvent(job, failedStage)
+ case SparkListenerJobCancelled(job, reason) =>
+ processJobEndEvent(job, reason)
+ case SparkListenerTaskEnd(event) =>
processTaskEndEvent(event)
case _ =>
}
@@ -225,26 +222,26 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}
- override def onStageSubmitted(stage: Stage, info: String = "") {
- eventQueue.put(JobLoggerOnStageSubmitted(stage, info))
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
+ eventQueue.put(stageSubmitted)
}
- protected def processStageSubmittedEvent(stage: Stage, info: String) {
- stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED " + info)
+ protected def processStageSubmittedEvent(stage: Stage, taskSize: Int) {
+ stageLogInfo(stage.id, "STAGE_ID=" + stage.id + " STATUS=SUBMITTED" + " TASK_SIZE=" + taskSize)
}
override def onStageCompleted(stageCompleted: StageCompleted) {
- eventQueue.put(JobLoggerOnStageCompleted(stageCompleted))
+ eventQueue.put(stageCompleted)
}
- protected def processStageCompletedEvent(stageCompleted: StageCompleted) {
- stageLogInfo(stageCompleted.stageInfo.stage.id, "STAGE_ID=" +
- stageCompleted.stageInfo.stage.id + " STATUS=COMPLETED")
+ protected def processStageCompletedEvent(stageInfo: StageInfo) {
+ stageLogInfo(stageInfo.stage.id, "STAGE_ID=" +
+ stageInfo.stage.id + " STATUS=COMPLETED")
}
- override def onTaskEnd(event: CompletionEvent) {
- eventQueue.put(JobLoggerOnTaskEnd(event))
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ eventQueue.put(taskEnd)
}
protected def processTaskEndEvent(event: CompletionEvent) {
@@ -273,24 +270,26 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) {
- eventQueue.put(JobLoggerOnJobEnd(job, event))
+ override def onJobEnd(jobEnd: SparkListenerEvents) {
+ eventQueue.put(jobEnd)
}
- protected def processJobEndEvent(job: ActiveJob, event: SparkListenerEvents) {
- var info = "JOB_ID=" + job.runId + " STATUS="
- var validEvent = true
- event match {
- case SparkListenerJobSuccess => info += "SUCCESS"
- case SparkListenerJobFailed(failedStage) =>
- info += "FAILED REASON=STAGE_FAILED FAILED_STAGE_ID=" + failedStage.id
- case SparkListenerJobCancelled(reason) => info += "CANCELLED REASON=" + reason
- case _ => validEvent = false
- }
- if (validEvent) {
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
+ protected def processJobEndEvent(job: ActiveJob) {
+ val info = "JOB_ID=" + job.runId + " STATUS=SUCCESS"
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
+ }
+
+ protected def processJobEndEvent(job: ActiveJob, failedStage: Stage) {
+ val info = "JOB_ID=" + job.runId + " STATUS=FAILED REASON=STAGE_FAILED FAILED_STAGE_ID="
+ + failedStage.id
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
+ }
+ protected def processJobEndEvent(job: ActiveJob, reason: String) {
+ var info = "JOB_ID=" + job.runId + " STATUS=CANCELLED REASON=" + reason
+ jobLogInfo(job.runId, info)
+ closeLogWriter(job.runId)
}
protected def recordJobProperties(jobID: Int, properties: Properties) {
@@ -300,8 +299,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- override def onJobStart(job: ActiveJob, properties: Properties = null) {
- eventQueue.put(JobLoggerOnJobStart(job, properties))
+ override def onJobStart(jobStart: SparkListenerJobStart) {
+ eventQueue.put(jobStart)
}
protected def processJobStartEvent(job: ActiveJob, properties: Properties) {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 9cf7f3ffc0..9265261dc1 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -6,6 +6,24 @@ import spark.util.Distribution
import spark.{Utils, Logging, SparkContext, TaskEndReason}
import spark.executor.TaskMetrics
+
+sealed trait SparkListenerEvents
+
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
+
+case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+
+case class SparkListenerTaskEnd(event: CompletionEvent) extends SparkListenerEvents
+
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
+ extends SparkListenerEvents
+
+case class SparkListenerJobSuccess(job: ActiveJob) extends SparkListenerEvents
+
+case class SparkListenerJobFailed(job: ActiveJob, failedStage: Stage) extends SparkListenerEvents
+
+case class SparkListenerJobCancelled(job: ActiveJob, reason: String) extends SparkListenerEvents
+
trait SparkListener {
/**
* called when a stage is completed, with information on the completed stage
@@ -15,35 +33,25 @@ trait SparkListener {
/**
* called when a stage is submitted
*/
- def onStageSubmitted(stage: Stage, info: String = "") { }
-
+ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+
/**
* called when a task ends
*/
- def onTaskEnd(event: CompletionEvent) { }
+ def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
/**
* called when a job starts
*/
- def onJobStart(job: ActiveJob, properties: Properties = null) { }
+ def onJobStart(jobStart: SparkListenerJobStart) { }
/**
* called when a job ends
*/
- def onJobEnd(job: ActiveJob, event: SparkListenerEvents) { }
+ def onJobEnd(jobEnd: SparkListenerEvents) { }
}
-sealed trait SparkListenerEvents
-
-case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
-
-case object SparkListenerJobSuccess extends SparkListenerEvents
-
-case class SparkListenerJobFailed(failedStage: Stage) extends SparkListenerEvents
-
-case class SparkListenerJobCancelled(reason: String) extends SparkListenerEvents
-
/**
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 34fd8b995e..a654bf3ffd 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -40,7 +40,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID)
- joblogger.onStageSubmitted(rootStage)
+ joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
joblogger.getEventQueue.size should be (1)
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
@@ -86,11 +86,11 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
var onJobStartCount = 0
var onStageCompletedCount = 0
var onStageSubmittedCount = 0
- override def onTaskEnd(event: CompletionEvent) = onTaskEndCount += 1
- override def onJobEnd(job: ActiveJob, event: SparkListenerEvents) = onJobEndCount += 1
- override def onJobStart(job: ActiveJob, properties: Properties) = onJobStartCount += 1
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
+ override def onJobEnd(jobEnd: SparkListenerEvents) = onJobEndCount += 1
+ override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
- override def onStageSubmitted(stage: Stage, info: String = "") = onStageSubmittedCount += 1
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1
}
sc.addSparkListener(joblogger)
val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) }