aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMingfei <mingfei.shi@intel.com>2013-06-21 17:48:41 +0800
committerMingfei <mingfei.shi@intel.com>2013-06-21 17:48:41 +0800
commitaa7aa587beff22e2db50d2afadd95097856a299a (patch)
tree70828dbeb743ac78d23cc6b5462416fc69777c36 /core/src
parent52407951541399e60a5292394b3a443a5e7ff281 (diff)
downloadspark-aa7aa587beff22e2db50d2afadd95097856a299a.tar.gz
spark-aa7aa587beff22e2db50d2afadd95097856a299a.tar.bz2
spark-aa7aa587beff22e2db50d2afadd95097856a299a.zip
some format modification
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala10
2 files changed, 7 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 4336f2f36d..e412baa803 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -324,8 +324,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job,
- JobFailed(error))))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
}
return true
}
@@ -671,9 +670,9 @@ class DAGScheduler(
val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
activeJobs -= job
resultStageToJob -= resultStage
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 760a0252b7..178bfaba3d 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -70,7 +70,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // create a log file for one job, the file name is the jobID
+ // Create a log file for one job, the file name is the jobID
protected def createLogWriter(jobID: Int) {
try{
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
@@ -80,7 +80,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // close log file for one job, and clean the stage relationship in stageIDToJobID
+ // Close log file, and clean the stage relationship in stageIDToJobID
protected def closeLogWriter(jobID: Int) =
jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
fileWriter.close()
@@ -91,7 +91,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobIDToStages -= jobID
}
- // write log information to log file, withTime parameter controls whether to recored
+ // Write log information to log file, withTime parameter controls whether to recored
// time stamp for the information
protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
@@ -145,7 +145,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}
}
- // generate indents and convert to String
+ // Generate indents and convert to String
protected def indentString(indent: Int) = {
val sb = new StringBuilder()
for (i <- 1 to indent) {
@@ -190,7 +190,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
}
- // record task metrics into job log files
+ // Record task metrics into job log files
protected def recordTaskMetrics(stageID: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +