diff options
author | Mingfei <mingfei.shi@intel.com> | 2013-06-21 17:38:23 +0800 |
---|---|---|
committer | Mingfei <mingfei.shi@intel.com> | 2013-06-21 17:38:23 +0800 |
commit | 52407951541399e60a5292394b3a443a5e7ff281 (patch) | |
tree | a2c01a9ddc72dec6217bd17d94eab250ef800796 /core/src | |
parent | 967a6a699da7da007f51e59d085a357da5ec14da (diff) | |
download | spark-52407951541399e60a5292394b3a443a5e7ff281.tar.gz spark-52407951541399e60a5292394b3a443a5e7ff281.tar.bz2 spark-52407951541399e60a5292394b3a443a5e7ff281.zip |
edit according to comments
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 6 | ||||
-rw-r--r-- | core/src/main/scala/spark/Utils.scala | 10 | ||||
-rw-r--r-- | core/src/main/scala/spark/executor/TaskMetrics.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/JobLogger.scala | 72 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/SparkListener.scala | 25 | ||||
-rw-r--r-- | core/src/test/scala/spark/scheduler/JobLoggerSuite.scala | 2 |
7 files changed, 62 insertions, 69 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8c0b7ca417..b17398953b 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -114,10 +114,10 @@ abstract class RDD[T: ClassManifest]( this } - /**User-defined generator of this RDD*/ - var generator = Utils.getCallSiteInfo._4 + /** User-defined generator of this RDD*/ + var generator = Utils.getCallSiteInfo.firstUserClass - /**reset generator*/ + /** Reset generator*/ def setGenerator(_generator: String) = { generator = _generator } diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 1630b2b4b0..1cfaee79b1 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -522,13 +522,14 @@ private object Utils extends Logging { execute(command, new File(".")) } - + class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, + val firstUserLine: Int, val firstUserClass: String) /** * When called inside a class in the spark package, returns the name of the user code class * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. */ - def getCallSiteInfo = { + def getCallSiteInfo: CallSiteInfo = { val trace = Thread.currentThread.getStackTrace().filter( el => (!el.getMethodName.contains("getStackTrace"))) @@ -560,12 +561,13 @@ private object Utils extends Logging { } } } - (lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) + new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } def formatSparkCallSite = { val callSiteInfo = getCallSiteInfo - "%s at %s:%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3) + "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, + callSiteInfo.firstUserLine) } /** * Try to find a free port to bind to on the local host. This should ideally never be needed, diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala index 26e8029365..1dc13754f9 100644 --- a/core/src/main/scala/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/spark/executor/TaskMetrics.scala @@ -2,7 +2,7 @@ package spark.executor class TaskMetrics extends Serializable { /** - * host's name the task runs on + * Host's name the task runs on */ var hostname: String = _ diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index e281e5a8db..4336f2f36d 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -312,7 +312,8 @@ class DAGScheduler( handleExecutorLost(execId) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion))) + sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, + completion.reason, completion.taskInfo, completion.taskMetrics))) handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => @@ -323,8 +324,8 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobCancelled(job, - "SPARKCONTEXT_SHUTDOWN"))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, + JobFailed(error)))) } return true } @@ -527,7 +528,7 @@ class DAGScheduler( activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobSuccess(job))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded))) } job.listener.taskSucceeded(rt.outputId, event.result) } @@ -668,10 +669,11 @@ class DAGScheduler( val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) - job.listener.jobFailed(new SparkException("Job failed: " + reason)) + val error = new SparkException("Job failed: " + reason) + job.listener.jobFailed(error) activeJobs -= job resultStageToJob -= resultStage - sparkListeners.foreach(_.onJobEnd(SparkListenerJobFailed(job, failedStage))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 002c5826cb..760a0252b7 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -12,7 +12,7 @@ import spark._ import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
-// used to record runtime information for each job, including RDD graph
+// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
class JobLogger(val logDirName: String) extends SparkListener with Logging {
@@ -49,21 +49,17 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
- case SparkListenerJobSuccess(job) =>
- processJobEndEvent(job)
- case SparkListenerJobFailed(job, failedStage) =>
- processJobEndEvent(job, failedStage)
- case SparkListenerJobCancelled(job, reason) =>
- processJobEndEvent(job, reason)
- case SparkListenerTaskEnd(event) =>
- processTaskEndEvent(event)
+ case SparkListenerJobEnd(job, result) =>
+ processJobEndEvent(job, result)
+ case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
+ processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
}
}
}
}.start()
- //create a folder for log files, the folder's name is the creation time of the jobLogger
+ // Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (dir.exists()) {
@@ -244,54 +240,50 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { eventQueue.put(taskEnd)
}
- protected def processTaskEndEvent(event: CompletionEvent) {
+ protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason,
+ taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
var taskStatus = ""
- event.task match {
+ task match {
case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
}
- event.reason match {
+ reason match {
case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics)
+ recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics)
case Resubmitted =>
- taskStatus += " STATUS=RESUBMITTED TID=" + event.taskInfo.taskId +
- " STAGE_ID=" + event.task.stageId
- stageLogInfo(event.task.stageId, taskStatus)
+ taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId
+ stageLogInfo(task.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
- taskStatus += " STATUS=FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" +
- event.task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
+ taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
+ task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
- stageLogInfo(event.task.stageId, taskStatus)
+ stageLogInfo(task.stageId, taskStatus)
case OtherFailure(message) =>
- taskStatus += " STATUS=FAILURE TID=" + event.taskInfo.taskId +
- " STAGE_ID=" + event.task.stageId + " INFO=" + message
- stageLogInfo(event.task.stageId, taskStatus)
+ taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId + " INFO=" + message
+ stageLogInfo(task.stageId, taskStatus)
case _ =>
}
}
- override def onJobEnd(jobEnd: SparkListenerEvents) {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
eventQueue.put(jobEnd)
}
- protected def processJobEndEvent(job: ActiveJob) {
- val info = "JOB_ID=" + job.runId + " STATUS=SUCCESS"
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
-
- protected def processJobEndEvent(job: ActiveJob, failedStage: Stage) {
- val info = "JOB_ID=" + job.runId + " STATUS=FAILED REASON=STAGE_FAILED FAILED_STAGE_ID="
- + failedStage.id
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
- protected def processJobEndEvent(job: ActiveJob, reason: String) {
- var info = "JOB_ID=" + job.runId + " STATUS=CANCELLED REASON=" + reason
- jobLogInfo(job.runId, info)
+ protected def processJobEndEvent(job: ActiveJob, reason: JobResult) {
+ var info = "JOB_ID=" + job.runId
+ reason match {
+ case JobSucceeded => info += " STATUS=SUCCESS"
+ case JobFailed(exception) =>
+ info += " STATUS=FAILED REASON="
+ exception.getMessage.split("\\s+").foreach(info += _ + "_")
+ case _ =>
+ }
+ jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.runId)
}
-
+
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val annotation = properties.getProperty("spark.job.annotation", "")
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 9265261dc1..bac984b5c9 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -3,52 +3,49 @@ package spark.scheduler import java.util.Properties import spark.scheduler.cluster.TaskInfo import spark.util.Distribution -import spark.{Utils, Logging, SparkContext, TaskEndReason} +import spark.{Logging, SparkContext, TaskEndReason, Utils} import spark.executor.TaskMetrics - sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents -case class SparkListenerTaskEnd(event: CompletionEvent) extends SparkListenerEvents +case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo, + taskMetrics: TaskMetrics) extends SparkListenerEvents case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) extends SparkListenerEvents - -case class SparkListenerJobSuccess(job: ActiveJob) extends SparkListenerEvents - -case class SparkListenerJobFailed(job: ActiveJob, failedStage: Stage) extends SparkListenerEvents -case class SparkListenerJobCancelled(job: ActiveJob, reason: String) extends SparkListenerEvents +case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) + extends SparkListenerEvents trait SparkListener { /** - * called when a stage is completed, with information on the completed stage + * Called when a stage is completed, with information on the completed stage */ def onStageCompleted(stageCompleted: StageCompleted) { } /** - * called when a stage is submitted + * Called when a stage is submitted */ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { } /** - * called when a task ends + * Called when a task ends */ def onTaskEnd(taskEnd: SparkListenerTaskEnd) { } /** - * called when a job starts + * Called when a job starts */ def onJobStart(jobStart: SparkListenerJobStart) { } /** - * called when a job ends + * Called when a job ends */ - def onJobEnd(jobEnd: SparkListenerEvents) { } + def onJobEnd(jobEnd: SparkListenerJobEnd) { } } diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala index a654bf3ffd..4000c4d520 100644 --- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -87,7 +87,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers var onStageCompletedCount = 0 var onStageSubmittedCount = 0 override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1 - override def onJobEnd(jobEnd: SparkListenerEvents) = onJobEndCount += 1 + override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1 override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1 override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1 |