diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-31 00:43:38 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-31 00:43:38 -0800 |
commit | 977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (patch) | |
tree | d585fe4a953a150e0c64b1daddb381bf6eb08d6b | |
parent | 87b915f22105ced8b9cad2a1262a0fd26542ee4f (diff) | |
parent | 50e3b8ec4c8150f1cfc6b92f8871f520adf2cfda (diff) | |
download | spark-977bcc36d4440ff562d5dbcc12449bf383d0d9e2.tar.gz spark-977bcc36d4440ff562d5dbcc12449bf383d0d9e2.tar.bz2 spark-977bcc36d4440ff562d5dbcc12449bf383d0d9e2.zip |
Merge branch 'apache-master' into project-refactor
14 files changed, 52 insertions, 67 deletions
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index c1e5e04b31..faf6dcd618 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -53,5 +53,3 @@ private[spark] case class ExceptionFailure( private[spark] case object TaskResultLost extends TaskEndReason private[spark] case object TaskKilled extends TaskEndReason - -private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 40d6bdb3fd..19aa800a95 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -140,12 +140,12 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I <body> {linkToMaster} <div> - <div style="float:left;width:40%">{backButton}</div> + <div style="float:left; margin-right:10px">{backButton}</div> <div style="float:left;">{range}</div> - <div style="float:right;">{nextButton}</div> + <div style="float:right; margin-left:10px">{nextButton}</div> </div> <br /> - <div style="height:500px;overflow:auto;padding:5px;"> + <div style="height:500px; overflow:auto; padding:5px;"> <pre>{logText}</pre> </div> </body> diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 963d15b76d..7603eb292f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -152,7 +152,8 @@ class DAGScheduler( val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val running = new HashSet[Stage] // Stages we are running right now val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures - val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage + // Missing tasks from each stage + val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] @@ -239,7 +240,8 @@ class DAGScheduler( shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => - val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) + val stage = + newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage } @@ -248,7 +250,8 @@ class DAGScheduler( /** * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided - * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage directly. + * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage + * directly. */ private def newStage( rdd: RDD[_], @@ -358,7 +361,8 @@ class DAGScheduler( stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id val parents = getParentStages(s.rdd, jobId) - val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) + val parentsWithoutThisJobId = parents.filter(p => + !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) } } @@ -366,8 +370,9 @@ class DAGScheduler( } /** - * Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that - * were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. + * Removes job and any stages that are not needed by any other job. Returns the set of ids for + * stages that were removed. The associated tasks for those stages need to be cancelled if we + * got here via job cancellation. */ private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { val registeredStages = jobIdToStageIds(jobId) @@ -378,7 +383,8 @@ class DAGScheduler( stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { case (stageId, jobSet) => if (!jobSet.contains(jobId)) { - logError("Job %d not registered for stage %d even though that stage was registered for the job" + logError( + "Job %d not registered for stage %d even though that stage was registered for the job" .format(jobId, stageId)) } else { def removeStage(stageId: Int) { @@ -389,7 +395,8 @@ class DAGScheduler( running -= s } stageToInfos -= s - shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove) + shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId => + shuffleToMapStage.remove(shuffleId)) if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { logDebug("Removing pending status for stage %d".format(stageId)) } @@ -407,7 +414,8 @@ class DAGScheduler( stageIdToStage -= stageId stageIdToJobIds -= stageId - logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) + logDebug("After removal of stage %d, remaining stages = %d" + .format(stageId, stageIdToStage.size)) } jobSet -= jobId @@ -459,7 +467,8 @@ class DAGScheduler( assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) - eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) + eventProcessActor ! JobSubmitted( + jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) waiter } @@ -494,7 +503,8 @@ class DAGScheduler( val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val partitions = (0 until rdd.partitions.size).toArray val jobId = nextJobId.getAndIncrement() - eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) + eventProcessActor ! JobSubmitted( + jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) listener.awaitResult() // Will throw an exception if the job fails } @@ -529,8 +539,8 @@ class DAGScheduler( case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => var finalStage: Stage = null try { - // New stage creation at times and if its not protected, the scheduler thread is killed. - // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted + // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD + // whose underlying HDFS files have been deleted. finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) } catch { case e: Exception => @@ -563,7 +573,8 @@ class DAGScheduler( case JobGroupCancelled(groupId) => // Cancel all jobs belonging to this job group. // First finds all active jobs with this group id, and then kill stages for them. - val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) + val activeInGroup = activeJobs.filter(activeJob => + groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val jobIds = activeInGroup.map(_.jobId) jobIds.foreach { handleJobCancellation } @@ -585,7 +596,8 @@ class DAGScheduler( stage <- stageIdToStage.get(task.stageId); stageInfo <- stageToInfos.get(stage) ) { - if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) { + if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && + !stageInfo.emittedTaskSizeWarning) { stageInfo.emittedTaskSizeWarning = true logWarning(("Stage %d (%s) contains a task of very large " + "size (%d KB). The maximum recommended task size is %d KB.").format( @@ -815,7 +827,7 @@ class DAGScheduler( } logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) - listenerBus.post(StageCompleted(stageToInfos(stage))) + listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) running -= stage } event.reason match { diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 60927831a1..f8fa5a9f7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -297,7 +297,7 @@ class JobLogger(val user: String, val logDirName: String) * When stage is completed, record stage completion status * @param stageCompleted Stage completed event */ - override def onStageCompleted(stageCompleted: StageCompleted) { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( stageCompleted.stage.stageId)) } @@ -328,10 +328,6 @@ class JobLogger(val user: String, val logDirName: String) task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + mapId + " REDUCE_ID=" + reduceId stageLogInfo(task.stageId, taskStatus) - case OtherFailure(message) => - taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + - " STAGE_ID=" + task.stageId + " INFO=" + message - stageLogInfo(task.stageId, taskStatus) case _ => } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 596f9adde9..1791242215 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -117,8 +117,4 @@ private[spark] class Pool( parent.decreaseRunningTasks(taskNum) } } - - override def hasPendingTasks(): Boolean = { - schedulableQueue.exists(_.hasPendingTasks()) - } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index 1c7ea2dccc..d573e125a3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -42,5 +42,4 @@ private[spark] trait Schedulable { def executorLost(executorId: String, host: String): Unit def checkSpeculatableTasks(): Boolean def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] - def hasPendingTasks(): Boolean } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index ee63b3c4a1..627995c826 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -27,7 +27,7 @@ sealed trait SparkListenerEvents case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties) extends SparkListenerEvents -case class StageCompleted(val stage: StageInfo) extends SparkListenerEvents +case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents @@ -47,7 +47,7 @@ trait SparkListener { /** * Called when a stage is completed, with information on the completed stage */ - def onStageCompleted(stageCompleted: StageCompleted) { } + def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { } /** * Called when a stage is submitted @@ -86,7 +86,7 @@ trait SparkListener { * Simple SparkListener that logs a few summary statistics when each stage completes */ class StatsReportListener extends SparkListener with Logging { - override def onStageCompleted(stageCompleted: StageCompleted) { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { import org.apache.spark.scheduler.StatsReportListener._ implicit val sc = stageCompleted this.logInfo("Finished stage: " + stageCompleted.stage) @@ -119,13 +119,17 @@ object StatsReportListener extends Logging { val probabilities = percentiles.map{_ / 100.0} val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" - def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = { + def extractDoubleDistribution(stage: SparkListenerStageCompleted, + getMetric: (TaskInfo,TaskMetrics) => Option[Double]) + : Option[Distribution] = { Distribution(stage.stage.taskInfos.flatMap { case ((info,metric)) => getMetric(info, metric)}) } //is there some way to setup the types that I can get rid of this completely? - def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = { + def extractLongDistribution(stage: SparkListenerStageCompleted, + getMetric: (TaskInfo,TaskMetrics) => Option[Long]) + : Option[Distribution] = { extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble}) } @@ -147,12 +151,12 @@ object StatsReportListener extends Logging { } def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showDistribution(heading, extractDoubleDistribution(stage, getMetric), format) } def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showBytesDistribution(heading, extractLongDistribution(stage, getMetric)) } @@ -169,7 +173,7 @@ object StatsReportListener extends Logging { } def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long]) - (implicit stage: StageCompleted) { + (implicit stage: SparkListenerStageCompleted) { showMillisDistribution(heading, extractLongDistribution(stage, getMetric)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 85687ea330..e7defd768b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -41,7 +41,7 @@ private[spark] class SparkListenerBus() extends Logging { event match {
case stageSubmitted: SparkListenerStageSubmitted =>
sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
- case stageCompleted: StageCompleted =>
+ case stageCompleted: SparkListenerStageCompleted =>
sparkListeners.foreach(_.onStageCompleted(stageCompleted))
case jobStart: SparkListenerJobStart =>
sparkListeners.foreach(_.onJobStart(jobStart))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index dbac6b96ac..1b0f82fa24 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -365,13 +365,6 @@ private[spark] class TaskSchedulerImpl( } } - // Check for pending tasks in all our active jobs. - def hasPendingTasks: Boolean = { - synchronized { - rootPool.hasPendingTasks() - } - } - def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c676e73e03..7929051791 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -112,10 +112,6 @@ private[spark] class TaskSetManager( // Task index, start and finish time for each task attempt (indexed by task ID) val taskInfos = new HashMap[Long, TaskInfo] - // Did the TaskSet fail? - var failed = false - var causeOfFailure = "" - // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong @@ -556,8 +552,6 @@ private[spark] class TaskSetManager( } def abort(message: String) { - failed = true - causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.dagScheduler.taskSetFailed(taskSet, message) removeAllRunningTasks() @@ -681,10 +675,6 @@ private[spark] class TaskSetManager( return foundTasks } - override def hasPendingTasks(): Boolean = { - numTasks > 0 && tasksSuccessful < numTasks - } - private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { val defaultWait = System.getProperty("spark.locality.wait", "3000") level match { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 07a42f0503..058bc2a2e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -61,7 +61,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onJobStart(jobStart: SparkListenerJobStart) {} - override def onStageCompleted(stageCompleted: StageCompleted) = synchronized { + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stage poolToActiveStages(stageIdToPool(stage.stageId)) -= stage activeStages -= stage @@ -146,12 +146,9 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList // update duration y.taskTime += taskEnd.taskInfo.duration - taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead => - y.shuffleRead += shuffleRead.remoteBytesRead - } - - taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite => - y.shuffleWrite += shuffleWrite.shuffleBytesWritten + Option(taskEnd.taskMetrics).foreach { taskMetrics => + taskMetrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead } + taskMetrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten } } } case _ => {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 002368ff55..d0bd20fc83 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -117,7 +117,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = onTaskEndCount += 1 override def onJobEnd(jobEnd: SparkListenerJobEnd) = onJobEndCount += 1 override def onJobStart(jobStart: SparkListenerJobStart) = onJobStartCount += 1 - override def onStageCompleted(stageCompleted: StageCompleted) = onStageCompletedCount += 1 + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = onStageCompletedCount += 1 override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = onStageSubmittedCount += 1 } sc.addSparkListener(joblogger) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index d4320e5e14..1a16e438c4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -174,7 +174,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc class SaveStageInfo extends SparkListener { val stageInfos = Buffer[StageInfo]() - override def onStageCompleted(stage: StageCompleted) { + override def onStageCompleted(stage: SparkListenerStageCompleted) { stageInfos += stage.stage } } diff --git a/spark-class2.cmd b/spark-class2.cmd index a60c17d050..dc9dadf356 100644 --- a/spark-class2.cmd +++ b/spark-class2.cmd @@ -17,7 +17,7 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem -set SCALA_VERSION=2.9.3 +set SCALA_VERSION=2.10 rem Figure out where the Spark framework is installed set FWDIR=%~dp0 |