diff options
author | Mingfei <mingfei.shi@intel.com> | 2013-06-21 17:48:41 +0800 |
---|---|---|
committer | Mingfei <mingfei.shi@intel.com> | 2013-06-21 17:48:41 +0800 |
commit | aa7aa587beff22e2db50d2afadd95097856a299a (patch) | |
tree | 70828dbeb743ac78d23cc6b5462416fc69777c36 /core/src | |
parent | 52407951541399e60a5292394b3a443a5e7ff281 (diff) | |
download | spark-aa7aa587beff22e2db50d2afadd95097856a299a.tar.gz spark-aa7aa587beff22e2db50d2afadd95097856a299a.tar.bz2 spark-aa7aa587beff22e2db50d2afadd95097856a299a.zip |
some format modification
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/scheduler/DAGScheduler.scala | 5 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/JobLogger.scala | 10 |
2 files changed, 7 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 4336f2f36d..e412baa803 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -324,8 +324,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, - JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } return true } @@ -671,9 +670,9 @@ class DAGScheduler( val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) activeJobs -= job resultStageToJob -= resultStage - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) } if (dependentStages.isEmpty) { logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done") diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 760a0252b7..178bfaba3d 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
}
- // create a log file for one job, the file name is the jobID
+ // Create a log file for one job, the file name is the jobID
protected def createLogWriter(jobID: Int) {
try{
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
@@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
}
- // close log file for one job, and clean the stage relationship in stageIDToJobID
+ // Close log file, and clean the stage relationship in stageIDToJobID
protected def closeLogWriter(jobID: Int) =
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
@@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobIDToStages -= jobID
}
- // write log information to log file, withTime parameter controls whether to recored
+ // Write log information to log file, withTime parameter controls whether to recored
// time stamp for the information
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
@@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { }
}
- // generate indents and convert to String
+ // Generate indents and convert to String
protected def indentString(indent: Int) = {
val sb = new StringBuilder()
for (i <- 1 to indent) {
@@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
}
- // record task metrics into job log files
+ // Record task metrics into job log files
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
|