aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMingfei <mingfei.shi@intel.com>2013-06-21 17:38:23 +0800
committerMingfei <mingfei.shi@intel.com>2013-06-21 17:38:23 +0800
commit52407951541399e60a5292394b3a443a5e7ff281 (patch)
treea2c01a9ddc72dec6217bd17d94eab250ef800796 /core/src
parent967a6a699da7da007f51e59d085a357da5ec14da (diff)
downloadspark-52407951541399e60a5292394b3a443a5e7ff281.tar.gz
spark-52407951541399e60a5292394b3a443a5e7ff281.tar.bz2
spark-52407951541399e60a5292394b3a443a5e7ff281.zip
edit according to comments
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/RDD.scala6
-rw-r--r--core/src/main/scala/spark/Utils.scala10
-rw-r--r--core/src/main/scala/spark/executor/TaskMetrics.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala72
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala25
-rw-r--r--core/src/test/scala/spark/scheduler/JobLoggerSuite.scala2
7 files changed, 62 insertions, 69 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 8c0b7ca417..b17398953b 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -114,10 +114,10 @@ abstract class RDD[T: ClassManifest](
this
}
- /**User-defined generator of this RDD*/
- var generator = Utils.getCallSiteInfo._4
+ /** User-defined generator of this RDD*/
+ var generator = Utils.getCallSiteInfo.firstUserClass
- /**reset generator*/
+ /** Reset generator*/
def setGenerator(_generator: String) = {
generator = _generator
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 1630b2b4b0..1cfaee79b1 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -522,13 +522,14 @@ private object Utils extends Logging {
execute(command, new File("."))
}
-
+ class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
+ val firstUserLine: Int, val firstUserClass: String)
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*/
- def getCallSiteInfo = {
+ def getCallSiteInfo: CallSiteInfo = {
val trace = Thread.currentThread.getStackTrace().filter( el =>
(!el.getMethodName.contains("getStackTrace")))
@@ -560,12 +561,13 @@ private object Utils extends Logging {
}
}
}
- (lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
+ new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}
def formatSparkCallSite = {
val callSiteInfo = getCallSiteInfo
- "%s at %s:%s".format(callSiteInfo._1, callSiteInfo._2, callSiteInfo._3)
+ "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
+ callSiteInfo.firstUserLine)
}
/**
* Try to find a free port to bind to on the local host. This should ideally never be needed,
diff --git a/core/src/main/scala/spark/executor/TaskMetrics.scala b/core/src/main/scala/spark/executor/TaskMetrics.scala
index 26e8029365..1dc13754f9 100644
--- a/core/src/main/scala/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/spark/executor/TaskMetrics.scala
@@ -2,7 +2,7 @@ package spark.executor
class TaskMetrics extends Serializable {
/**
- * host's name the task runs on
+ * Host's name the task runs on
*/
var hostname: String = _
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index e281e5a8db..4336f2f36d 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -312,7 +312,8 @@ class DAGScheduler(
handleExecutorLost(execId)
case completion: CompletionEvent =>
- sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion)))
+ sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task,
+ completion.reason, completion.taskInfo, completion.taskMetrics)))
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
@@ -323,8 +324,8 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobCancelled(job,
- "SPARKCONTEXT_SHUTDOWN")))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job,
+ JobFailed(error))))
}
return true
}
@@ -527,7 +528,7 @@ class DAGScheduler(
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobSuccess(job)))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobSucceeded)))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -668,10 +669,11 @@ class DAGScheduler(
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage)
- job.listener.jobFailed(new SparkException("Job failed: " + reason))
+ val error = new SparkException("Job failed: " + reason)
+ job.listener.jobFailed(error)
activeJobs -= job
resultStageToJob -= resultStage
- sparkListeners.foreach(_.onJobEnd(SparkListenerJobFailed(job, failedStage)))
+ sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error))))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 002c5826cb..760a0252b7 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -12,7 +12,7 @@ import spark._
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
-// used to record runtime information for each job, including RDD graph
+// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
class JobLogger(val logDirName: String) extends SparkListener with Logging {
@@ -49,21 +49,17 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
- case SparkListenerJobSuccess(job) =>
- processJobEndEvent(job)
- case SparkListenerJobFailed(job, failedStage) =>
- processJobEndEvent(job, failedStage)
- case SparkListenerJobCancelled(job, reason) =>
- processJobEndEvent(job, reason)
- case SparkListenerTaskEnd(event) =>
- processTaskEndEvent(event)
+ case SparkListenerJobEnd(job, result) =>
+ processJobEndEvent(job, result)
+ case SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics) =>
+ processTaskEndEvent(task, reason, taskInfo, taskMetrics)
case _ =>
}
}
}
}.start()
- //create a folder for log files, the folder's name is the creation time of the jobLogger
+ // Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (dir.exists()) {
@@ -244,54 +240,50 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
eventQueue.put(taskEnd)
}
- protected def processTaskEndEvent(event: CompletionEvent) {
+ protected def processTaskEndEvent(task: Task[_], reason: TaskEndReason,
+ taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
var taskStatus = ""
- event.task match {
+ task match {
case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
}
- event.reason match {
+ reason match {
case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(event.task.stageId, taskStatus, event.taskInfo, event.taskMetrics)
+ recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskMetrics)
case Resubmitted =>
- taskStatus += " STATUS=RESUBMITTED TID=" + event.taskInfo.taskId +
- " STAGE_ID=" + event.task.stageId
- stageLogInfo(event.task.stageId, taskStatus)
+ taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId
+ stageLogInfo(task.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
- taskStatus += " STATUS=FETCHFAILED TID=" + event.taskInfo.taskId + " STAGE_ID=" +
- event.task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
+ taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
+ task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
- stageLogInfo(event.task.stageId, taskStatus)
+ stageLogInfo(task.stageId, taskStatus)
case OtherFailure(message) =>
- taskStatus += " STATUS=FAILURE TID=" + event.taskInfo.taskId +
- " STAGE_ID=" + event.task.stageId + " INFO=" + message
- stageLogInfo(event.task.stageId, taskStatus)
+ taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId + " INFO=" + message
+ stageLogInfo(task.stageId, taskStatus)
case _ =>
}
}
- override def onJobEnd(jobEnd: SparkListenerEvents) {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
eventQueue.put(jobEnd)
}
- protected def processJobEndEvent(job: ActiveJob) {
- val info = "JOB_ID=" + job.runId + " STATUS=SUCCESS"
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
-
- protected def processJobEndEvent(job: ActiveJob, failedStage: Stage) {
- val info = "JOB_ID=" + job.runId + " STATUS=FAILED REASON=STAGE_FAILED FAILED_STAGE_ID="
- + failedStage.id
- jobLogInfo(job.runId, info)
- closeLogWriter(job.runId)
- }
- protected def processJobEndEvent(job: ActiveJob, reason: String) {
- var info = "JOB_ID=" + job.runId + " STATUS=CANCELLED REASON=" + reason
- jobLogInfo(job.runId, info)
+ protected def processJobEndEvent(job: ActiveJob, reason: JobResult) {
+ var info = "JOB_ID=" + job.runId
+ reason match {
+ case JobSucceeded => info += " STATUS=SUCCESS"
+ case JobFailed(exception) =>
+ info += " STATUS=FAILED REASON="
+ exception.getMessage.split("\\s+").foreach(info += _ + "_")
+ case _ =>
+ }
+ jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
closeLogWriter(job.runId)
}
-
+
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
val annotation = properties.getProperty("spark.job.annotation", "")
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 9265261dc1..bac984b5c9 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -3,52 +3,49 @@ package spark.scheduler
import java.util.Properties
import spark.scheduler.cluster.TaskInfo
import spark.util.Distribution
-import spark.{Utils, Logging, SparkContext, TaskEndReason}
+import spark.{Logging, SparkContext, TaskEndReason, Utils}
import spark.executor.TaskMetrics
-
sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
-case class SparkListenerTaskEnd(event: CompletionEvent) extends SparkListenerEvents
+case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics) extends SparkListenerEvents
case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
-
-case class SparkListenerJobSuccess(job: ActiveJob) extends SparkListenerEvents
-
-case class SparkListenerJobFailed(job: ActiveJob, failedStage: Stage) extends SparkListenerEvents
-case class SparkListenerJobCancelled(job: ActiveJob, reason: String) extends SparkListenerEvents
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
+ extends SparkListenerEvents
trait SparkListener {
/**
- * called when a stage is completed, with information on the completed stage
+ * Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
/**
- * called when a stage is submitted
+ * Called when a stage is submitted
*/
def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
/**
- * called when a task ends
+ * Called when a task ends
*/
def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
/**
- * called when a job starts
+ * Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
/**
- * called when a job ends
+ * Called when a job ends
*/
- def onJobEnd(jobEnd: SparkListenerEvents) { }
+ def onJobEnd(jobEnd: SparkListenerJobEnd) { }
}
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index a654bf3ffd..4000c4d520 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -87,7 +87,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
var onStageCompletedCount = 0
var onStageSubmittedCount = 0
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1
- override def onJobEnd(jobEnd: SparkListenerEvents) = onJobEndCount += 1
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1
override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1
override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1