aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2015-09-14 21:47:40 -0400
committerMatei Zaharia <matei@databricks.com>2015-09-14 21:47:40 -0400
commit1a0955250bb65cd6f5818ad60efb62ea4b45d18e (patch)
tree91afe7fda6170e8e2c85563b5f336418c14ae6cc
parent7b6c856367b9c36348e80e83959150da9656c4dd (diff)
downloadspark-1a0955250bb65cd6f5818ad60efb62ea4b45d18e.tar.gz
spark-1a0955250bb65cd6f5818ad60efb62ea4b45d18e.tar.bz2
spark-1a0955250bb65cd6f5818ad60efb62ea4b45d18e.zip
[SPARK-9851] Support submitting map stages individually in DAGScheduler
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions. The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example. Author: Matei Zaharia <matei@databricks.com> Closes #8180 from mateiz/spark-9851.
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputStatistics.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala251
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala65
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala243
12 files changed, 710 insertions, 63 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
new file mode 100644
index 0000000000..f8a6f1d0d8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/MapOutputStatistics.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * Holds statistics about the output sizes in a map stage. May become a DeveloperApi in the future.
+ *
+ * @param shuffleId ID of the shuffle
+ * @param bytesByPartitionId approximate number of output bytes for each map output partition
+ * (may be inexact due to use of compressed map statuses)
+ */
+private[spark] class MapOutputStatistics(val shuffleId: Int, val bytesByPartitionId: Array[Long])
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index a387592783..94eb8daa85 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io._
+import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
@@ -132,13 +133,43 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
- : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
+ : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
- val startTime = System.currentTimeMillis
+ val statuses = getStatuses(shuffleId)
+ // Synchronize on the returned array because, on the driver, it gets mutated in place
+ statuses.synchronized {
+ return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
+ }
+ }
+ /**
+ * Return statistics about all of the outputs for a given shuffle.
+ */
+ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = {
+ val statuses = getStatuses(dep.shuffleId)
+ // Synchronize on the returned array because, on the driver, it gets mutated in place
+ statuses.synchronized {
+ val totalSizes = new Array[Long](dep.partitioner.numPartitions)
+ for (s <- statuses) {
+ for (i <- 0 until totalSizes.length) {
+ totalSizes(i) += s.getSizeForBlock(i)
+ }
+ }
+ new MapOutputStatistics(dep.shuffleId, totalSizes)
+ }
+ }
+
+ /**
+ * Get or fetch the array of MapStatuses for a given shuffle ID. NOTE: clients MUST synchronize
+ * on this array when reading it, because on the driver, we may be changing it in place.
+ *
+ * (It would be nice to remove this restriction in the future.)
+ */
+ private def getStatuses(shuffleId: Int): Array[MapStatus] = {
val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
+ val startTime = System.currentTimeMillis
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
// Someone else is fetching it; wait for them to be done
@@ -160,7 +191,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
if (fetchedStatuses == null) {
- // We won the race to fetch the output locs; do so
+ // We won the race to fetch the statuses; do so
logInfo("Doing the fetch; tracker endpoint = " + trackerEndpoint)
// This try-finally prevents hangs due to timeouts:
try {
@@ -175,22 +206,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}
}
}
- logDebug(s"Fetching map output location for shuffle $shuffleId, reduce $reduceId took " +
+ logDebug(s"Fetching map output statuses for shuffle $shuffleId took " +
s"${System.currentTimeMillis - startTime} ms")
if (fetchedStatuses != null) {
- fetchedStatuses.synchronized {
- return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
- }
+ return fetchedStatuses
} else {
logError("Missing all output locations for shuffle " + shuffleId)
throw new MetadataFetchFailedException(
- shuffleId, reduceId, "Missing all output locations for shuffle " + shuffleId)
+ shuffleId, -1, "Missing all output locations for shuffle " + shuffleId)
}
} else {
- statuses.synchronized {
- return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
- }
+ return statuses
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e27b3c4962..dee6091ce3 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1985,6 +1985,23 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
/**
+ * Submit a map stage for execution. This is currently an internal API only, but might be
+ * promoted to DeveloperApi in the future.
+ */
+ private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
+ : SimpleFutureAction[MapOutputStatistics] = {
+ assertNotStopped()
+ val callSite = getCallSite()
+ var result: MapOutputStatistics = null
+ val waiter = dagScheduler.submitMapStage(
+ dependency,
+ (r: MapOutputStatistics) => { result = r },
+ callSite,
+ localProperties.get)
+ new SimpleFutureAction[MapOutputStatistics](waiter, result)
+ }
+
+ /**
* Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]
* for more information.
*/
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index 50a6937941..a3d2db3130 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -23,18 +23,42 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.CallSite
/**
- * Tracks information about an active job in the DAGScheduler.
+ * A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a
+ * ResultStage to execute an action, or a map-stage job, which computes the map outputs for a
+ * ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive
+ * query planning, to look at map output statistics before submitting later stages. We distinguish
+ * between these two types of jobs using the finalStage field of this class.
+ *
+ * Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's
+ * submitJob or submitMapStage methods. However, either type of job may cause the execution of
+ * other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of
+ * these previous stages. These dependencies are managed inside DAGScheduler.
+ *
+ * @param jobId A unique ID for this job.
+ * @param finalStage The stage that this job computes (either a ResultStage for an action or a
+ * ShuffleMapStage for submitMapStage).
+ * @param callSite Where this job was initiated in the user's program (shown on UI).
+ * @param listener A listener to notify if tasks in this job finish or the job fails.
+ * @param properties Scheduling properties attached to the job, such as fair scheduler pool name.
*/
private[spark] class ActiveJob(
val jobId: Int,
- val finalStage: ResultStage,
- val func: (TaskContext, Iterator[_]) => _,
- val partitions: Array[Int],
+ val finalStage: Stage,
val callSite: CallSite,
val listener: JobListener,
val properties: Properties) {
- val numPartitions = partitions.length
+ /**
+ * Number of partitions we need to compute for this job. Note that result stages may not need
+ * to compute all partitions in their target RDD, for actions like first() and lookup().
+ */
+ val numPartitions = finalStage match {
+ case r: ResultStage => r.partitions.length
+ case m: ShuffleMapStage => m.rdd.partitions.length
+ }
+
+ /** Which partitions of the stage have finished */
val finished = Array.fill[Boolean](numPartitions)(false)
+
var numFinished = 0
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 09e963f5cd..b4f90e8347 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -45,17 +45,65 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
* stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
* minimal schedule to run the job. It then submits stages as TaskSets to an underlying
- * TaskScheduler implementation that runs them on the cluster.
+ * TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
+ * tasks that can run right away based on the data that's already on the cluster (e.g. map output
+ * files from previous stages), though it may fail if this data becomes unavailable.
*
- * In addition to coming up with a DAG of stages, this class also determines the preferred
+ * Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
+ * "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
+ * in each stage, but operations with shuffle dependencies require multiple stages (one to write a
+ * set of map output files, and another to read those files after a barrier). In the end, every
+ * stage will have only shuffle dependencies on other stages, and may compute multiple operations
+ * inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
+ * various RDDs (MappedRDD, FilteredRDD, etc).
+ *
+ * In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
* locations to run each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
+ * When looking through this code, there are several key concepts:
+ *
+ * - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
+ * For example, when the user calls an action, like count(), a job will be submitted through
+ * submitJob. Each Job may require the execution of multiple stages to build intermediate data.
+ *
+ * - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
+ * task computes the same function on partitions of the same RDD. Stages are separated at shuffle
+ * boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
+ * fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
+ * executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
+ * Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
+ *
+ * - Tasks are individual units of work, each sent to one machine.
+ *
+ * - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
+ * and likewise remembers which shuffle map stages have already produced output files to avoid
+ * redoing the map side of a shuffle.
+ *
+ * - Preferred locations: the DAGScheduler also computes where to run each task in a stage based
+ * on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
+ *
+ * - Cleanup: all data structures are cleared when the running jobs that depend on them finish,
+ * to prevent memory leaks in a long-running application.
+ *
+ * To recover from failures, the same stage might need to run multiple times, which are called
+ * "attempts". If the TaskScheduler reports that a task failed because a map output file from a
+ * previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a
+ * CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small
+ * amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost
+ * stage(s) that compute the missing tasks. As part of this process, we might also have to create
+ * Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since
+ * tasks from the old attempt of a stage could still be running, care must be taken to map any
+ * events received in the correct Stage object.
+ *
* Here's a checklist to use when making or reviewing changes to this class:
*
+ * - All data structures should be cleared when the jobs involving them end to avoid indefinite
+ * accumulation of state in long-running programs.
+ *
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
@@ -295,12 +343,12 @@ class DAGScheduler(
*/
private def newResultStage(
rdd: RDD[_],
- numTasks: Int,
+ func: (TaskContext, Iterator[_]) => _,
+ partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
- val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
-
+ val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
@@ -500,12 +548,25 @@ class DAGScheduler(
jobIdToStageIds -= job.jobId
jobIdToActiveJob -= job.jobId
activeJobs -= job
- job.finalStage.resultOfJob = None
+ job.finalStage match {
+ case r: ResultStage =>
+ r.resultOfJob = None
+ case m: ShuffleMapStage =>
+ m.mapStageJobs = m.mapStageJobs.filter(_ != job)
+ }
}
/**
- * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
+ * Submit an action job to the scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like first()
+ * @param callSite where in the user program this job was called
+ * @param resultHandler callback to pass each result to
+ * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
*/
def submitJob[T, U](
rdd: RDD[T],
@@ -524,6 +585,7 @@ class DAGScheduler(
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
+ // Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
@@ -536,6 +598,18 @@ class DAGScheduler(
waiter
}
+ /**
+ * Run an action job on the given RDD and pass all the results to the resultHandler function as
+ * they arrive. Throws an exception if the job fials, or returns normally if successful.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like first()
+ * @param callSite where in the user program this job was called
+ * @param resultHandler callback to pass each result to
+ * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
+ */
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
@@ -559,6 +633,17 @@ class DAGScheduler(
}
}
+ /**
+ * Run an approximate job on the given RDD and pass all the results to an ApproximateEvaluator
+ * as they arrive. Returns a partial result object from the evaluator.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param evaluator [[ApproximateEvaluator]] to receive the partial results
+ * @param callSite where in the user program this job was called
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
+ */
def runApproximateJob[T, U, R](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
@@ -576,6 +661,41 @@ class DAGScheduler(
}
/**
+ * Submit a shuffle map stage to run independently and get a JobWaiter object back. The waiter
+ * can be used to block until the the job finishes executing or can be used to cancel the job.
+ * This method is used for adaptive query planning, to run map stages and look at statistics
+ * about their outputs before submitting downstream stages.
+ *
+ * @param dependency the ShuffleDependency to run a map stage for
+ * @param callback function called with the result of the job, which in this case will be a
+ * single MapOutputStatistics object showing how much data was produced for each partition
+ * @param callSite where in the user program this job was submitted
+ * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
+ */
+ def submitMapStage[K, V, C](
+ dependency: ShuffleDependency[K, V, C],
+ callback: MapOutputStatistics => Unit,
+ callSite: CallSite,
+ properties: Properties): JobWaiter[MapOutputStatistics] = {
+
+ val rdd = dependency.rdd
+ val jobId = nextJobId.getAndIncrement()
+ if (rdd.partitions.length == 0) {
+ throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
+ }
+
+ // We create a JobWaiter with only one "task", which will be marked as complete when the whole
+ // map stage has completed, and will be passed the MapOutputStatistics for that stage.
+ // This makes it easier to avoid race conditions between the user code and the map output
+ // tracker that might result if we told the user the stage had finished, but then they queries
+ // the map output tracker and some node failures had caused the output statistics to be lost.
+ val waiter = new JobWaiter(this, jobId, 1, (i: Int, r: MapOutputStatistics) => callback(r))
+ eventProcessLoop.post(MapStageSubmitted(
+ jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
+ waiter
+ }
+
+ /**
* Cancel a job that is running or waiting in the queue.
*/
def cancelJob(jobId: Int): Unit = {
@@ -583,6 +703,9 @@ class DAGScheduler(
eventProcessLoop.post(JobCancelled(jobId))
}
+ /**
+ * Cancel all jobs in the given job group ID.
+ */
def cancelJobGroup(groupId: String): Unit = {
logInfo("Asked to cancel job group " + groupId)
eventProcessLoop.post(JobGroupCancelled(groupId))
@@ -720,31 +843,77 @@ class DAGScheduler(
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
- finalStage = newResultStage(finalRDD, partitions.length, jobId, callSite)
+ finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
- if (finalStage != null) {
- val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
- clearCacheLocs()
- logInfo("Got job %s (%s) with %d output partitions".format(
- job.jobId, callSite.shortForm, partitions.length))
- logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
- logInfo("Parents of final stage: " + finalStage.parents)
- logInfo("Missing parents: " + getMissingParentStages(finalStage))
- val jobSubmissionTime = clock.getTimeMillis()
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.resultOfJob = Some(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
- submitStage(finalStage)
+
+ val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
+ clearCacheLocs()
+ logInfo("Got job %s (%s) with %d output partitions".format(
+ job.jobId, callSite.shortForm, partitions.length))
+ logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
+ val jobSubmissionTime = clock.getTimeMillis()
+ jobIdToActiveJob(jobId) = job
+ activeJobs += job
+ finalStage.resultOfJob = Some(job)
+ val stageIds = jobIdToStageIds(jobId).toArray
+ val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
+ submitStage(finalStage)
+
+ submitWaitingStages()
+ }
+
+ private[scheduler] def handleMapStageSubmitted(jobId: Int,
+ dependency: ShuffleDependency[_, _, _],
+ callSite: CallSite,
+ listener: JobListener,
+ properties: Properties) {
+ // Submitting this map stage might still require the creation of some parent stages, so make
+ // sure that happens.
+ var finalStage: ShuffleMapStage = null
+ try {
+ // New stage creation may throw an exception if, for example, jobs are run on a
+ // HadoopRDD whose underlying HDFS files have been deleted.
+ finalStage = getShuffleMapStage(dependency, jobId)
+ } catch {
+ case e: Exception =>
+ logWarning("Creating new stage failed due to exception - job: " + jobId, e)
+ listener.jobFailed(e)
+ return
+ }
+
+ val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
+ clearCacheLocs()
+ logInfo("Got map stage job %s (%s) with %d output partitions".format(
+ jobId, callSite.shortForm, dependency.rdd.partitions.size))
+ logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
+ logInfo("Parents of final stage: " + finalStage.parents)
+ logInfo("Missing parents: " + getMissingParentStages(finalStage))
+
+ val jobSubmissionTime = clock.getTimeMillis()
+ jobIdToActiveJob(jobId) = job
+ activeJobs += job
+ finalStage.mapStageJobs = job :: finalStage.mapStageJobs
+ val stageIds = jobIdToStageIds(jobId).toArray
+ val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
+ submitStage(finalStage)
+
+ // If the whole stage has already finished, tell the listener and remove it
+ if (!finalStage.outputLocs.contains(Nil)) {
+ markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
}
+
submitWaitingStages()
}
@@ -814,7 +983,7 @@ class DAGScheduler(
case s: ResultStage =>
val job = s.resultOfJob.get
partitionsToCompute.map { id =>
- val p = job.partitions(id)
+ val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
@@ -844,7 +1013,7 @@ class DAGScheduler(
case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
case stage: ResultStage =>
- closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
+ closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
}
taskBinary = sc.broadcast(taskBinaryBytes)
@@ -875,7 +1044,7 @@ class DAGScheduler(
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
- val p: Int = job.partitions(id)
+ val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
@@ -1052,13 +1221,21 @@ class DAGScheduler(
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
- .map(_._2).mkString(", "))
+ .map(_._2).mkString(", "))
submitStage(shuffleStage)
+ } else {
+ // Mark any map-stage jobs waiting on this stage as finished
+ if (shuffleStage.mapStageJobs.nonEmpty) {
+ val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
+ for (job <- shuffleStage.mapStageJobs) {
+ markMapStageJobAsFinished(job, stats)
+ }
+ }
}
// Note: newly runnable stages will be submitted below when we submit waiting stages
}
- }
+ }
case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
@@ -1412,6 +1589,17 @@ class DAGScheduler(
Nil
}
+ /** Mark a map stage job as finished with the given output stats, and report to its listener. */
+ def markMapStageJobAsFinished(job: ActiveJob, stats: MapOutputStatistics): Unit = {
+ // In map stage jobs, we only create a single "task", which is to finish all of the stage
+ // (including reusing any previous map outputs, etc); so we just mark task 0 as done
+ job.finished(0) = true
+ job.numFinished += 1
+ job.listener.taskSucceeded(0, stats)
+ cleanupStateForJobAndIndependentStages(job)
+ listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
+ }
+
def stop() {
logInfo("Stopping DAGScheduler")
messageScheduler.shutdownNow()
@@ -1445,6 +1633,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
+ case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
+ dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
+
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index f72a52e85d..dda3b6cc7f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -35,6 +35,7 @@ import org.apache.spark.util.CallSite
*/
private[scheduler] sealed trait DAGSchedulerEvent
+/** A result-yielding job was submitted on a target RDD */
private[scheduler] case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
@@ -45,6 +46,15 @@ private[scheduler] case class JobSubmitted(
properties: Properties = null)
extends DAGSchedulerEvent
+/** A map stage as submitted to run as a separate job */
+private[scheduler] case class MapStageSubmitted(
+ jobId: Int,
+ dependency: ShuffleDependency[_, _, _],
+ callSite: CallSite,
+ listener: JobListener,
+ properties: Properties = null)
+ extends DAGSchedulerEvent
+
private[scheduler] case class StageCancelled(stageId: Int) extends DAGSchedulerEvent
private[scheduler] case class JobCancelled(jobId: Int) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
index bf81b9aca4..c0451da1f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -17,23 +17,30 @@
package org.apache.spark.scheduler
+import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
/**
- * The ResultStage represents the final stage in a job.
+ * ResultStages apply a function on some partitions of an RDD to compute the result of an action.
+ * The ResultStage object captures the function to execute, `func`, which will be applied to each
+ * partition, and the set of partition IDs, `partitions`. Some stages may not run on all partitions
+ * of the RDD, for actions like first() and lookup().
*/
private[spark] class ResultStage(
id: Int,
rdd: RDD[_],
- numTasks: Int,
+ val func: (TaskContext, Iterator[_]) => _,
+ val partitions: Array[Int],
parents: List[Stage],
firstJobId: Int,
callSite: CallSite)
- extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
+ extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
- // The active job for this result stage. Will be empty if the job has already finished
- // (e.g., because the job was cancelled).
+ /**
+ * The active job for this result stage. Will be empty if the job has already finished
+ * (e.g., because the job was cancelled).
+ */
var resultOfJob: Option[ActiveJob] = None
override def toString: String = "ResultStage " + id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 48d8d8e9c4..7d92960876 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -23,7 +23,15 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.CallSite
/**
- * The ShuffleMapStage represents the intermediate stages in a job.
+ * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
+ * They occur right before each shuffle operation, and might contain multiple pipelined operations
+ * before that (e.g. map and filter). When executed, they save map output files that can later be
+ * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
+ * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
+ *
+ * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
+ * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
+ * there can be multiple ActiveJobs trying to compute the same shuffle map stage.
*/
private[spark] class ShuffleMapStage(
id: Int,
@@ -37,6 +45,9 @@ private[spark] class ShuffleMapStage(
override def toString: String = "ShuffleMapStage " + id
+ /** Running map-stage jobs that were submitted to execute this stage independently (if any) */
+ var mapStageJobs: List[ActiveJob] = Nil
+
var numAvailableOutputs: Int = 0
def isAvailable: Boolean = numAvailableOutputs == numPartitions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c086535782..b37eccbd0f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -24,27 +24,33 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.CallSite
/**
- * A stage is a set of independent tasks all computing the same function that need to run as part
+ * A stage is a set of parallel tasks all computing the same function that need to run as part
* of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
* by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
* DAGScheduler runs these stages in topological order.
*
* Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
- * another stage, or a result stage, in which case its tasks directly compute the action that
- * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
- * that each output partition is on.
+ * other stage(s), or a result stage, in which case its tasks directly compute a Spark action
+ * (e.g. count(), save(), etc) by running a function on an RDD. For shuffle map stages, we also
+ * track the nodes that each output partition is on.
*
* Each Stage also has a firstJobId, identifying the job that first submitted the stage. When FIFO
* scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
* faster on failure.
*
- * The callSite provides a location in user code which relates to the stage. For a shuffle map
- * stage, the callSite gives the user code that created the RDD being shuffled. For a result
- * stage, the callSite gives the user code that executes the associated action (e.g. count()).
- *
- * A single stage can consist of multiple attempts. In that case, the latestInfo field will
- * be updated for each attempt.
+ * Finally, a single stage can be re-executed in multiple attempts due to fault recovery. In that
+ * case, the Stage object will track multiple StageInfo objects to pass to listeners or the web UI.
+ * The latest one will be accessible through latestInfo.
*
+ * @param id Unique stage ID
+ * @param rdd RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks
+ * on, while for a result stage, it's the target RDD that we ran an action on
+ * @param numTasks Total number of tasks in stage; result stages in particular may not need to
+ * compute all partitions, e.g. for first(), lookup(), and take().
+ * @param parents List of stages that this stage depends on (through shuffle dependencies).
+ * @param firstJobId ID of the first job this stage was part of, for FIFO scheduling.
+ * @param callSite Location in the user program associated with this stage: either where the target
+ * RDD was created, for a shuffle map stage, or where the action for a result stage was called.
*/
private[scheduler] abstract class Stage(
val id: Int,
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index aa50a49c50..f58756e6f6 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -217,6 +217,27 @@ class FailureSuite extends SparkFunSuite with LocalSparkContext {
FailureSuiteState.clear()
}
+ // Run a 3-task map stage where one task fails once.
+ test("failure in tasks in a submitMapStage") {
+ sc = new SparkContext("local[1,2]", "test")
+ val rdd = sc.makeRDD(1 to 3, 3).map { x =>
+ FailureSuiteState.synchronized {
+ FailureSuiteState.tasksRun += 1
+ if (x == 1 && FailureSuiteState.tasksFailed == 0) {
+ FailureSuiteState.tasksFailed += 1
+ throw new Exception("Intentional task failure")
+ }
+ }
+ (x, x)
+ }
+ val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2))
+ sc.submitMapStage(dep).get()
+ FailureSuiteState.synchronized {
+ assert(FailureSuiteState.tasksRun === 4)
+ }
+ FailureSuiteState.clear()
+ }
+
// TODO: Need to add tests with shuffle fetch failures.
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
new file mode 100644
index 0000000000..3fe28027c3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/AdaptiveSchedulingSuite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.rdd.{ShuffledRDDPartition, RDD, ShuffledRDD}
+import org.apache.spark._
+
+object AdaptiveSchedulingSuiteState {
+ var tasksRun = 0
+
+ def clear(): Unit = {
+ tasksRun = 0
+ }
+}
+
+/** A special ShuffledRDD where we can pass a ShuffleDependency object to use */
+class CustomShuffledRDD[K, V, C](@transient dep: ShuffleDependency[K, V, C])
+ extends RDD[(K, C)](dep.rdd.context, Seq(dep)) {
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
+ val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
+ SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
+ .read()
+ .asInstanceOf[Iterator[(K, C)]]
+ }
+
+ override def getPartitions: Array[Partition] = {
+ Array.tabulate[Partition](dep.partitioner.numPartitions)(i => new ShuffledRDDPartition(i))
+ }
+}
+
+class AdaptiveSchedulingSuite extends SparkFunSuite with LocalSparkContext {
+ test("simple use of submitMapStage") {
+ try {
+ sc = new SparkContext("local[1,2]", "test")
+ val rdd = sc.parallelize(1 to 3, 3).map { x =>
+ AdaptiveSchedulingSuiteState.tasksRun += 1
+ (x, x)
+ }
+ val dep = new ShuffleDependency[Int, Int, Int](rdd, new HashPartitioner(2))
+ val shuffled = new CustomShuffledRDD[Int, Int, Int](dep)
+ sc.submitMapStage(dep).get()
+ assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
+ assert(shuffled.collect().toSet == Set((1, 1), (2, 2), (3, 3)))
+ assert(AdaptiveSchedulingSuiteState.tasksRun == 3)
+ } finally {
+ AdaptiveSchedulingSuiteState.clear()
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1b9ff740ff..1c55f90ad9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -152,6 +152,14 @@ class DAGSchedulerSuite
override def jobFailed(exception: Exception) = { failure = exception }
}
+ /** A simple helper class for creating custom JobListeners */
+ class SimpleListener extends JobListener {
+ val results = new HashMap[Int, Any]
+ var failure: Exception = null
+ override def taskSucceeded(index: Int, result: Any): Unit = results.put(index, result)
+ override def jobFailed(exception: Exception): Unit = { failure = exception }
+ }
+
before {
sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
@@ -229,7 +237,7 @@ class DAGSchedulerSuite
}
}
- /** Sends the rdd to the scheduler for scheduling and returns the job id. */
+ /** Submits a job to the scheduler and returns the job id. */
private def submit(
rdd: RDD[_],
partitions: Array[Int],
@@ -240,6 +248,15 @@ class DAGSchedulerSuite
jobId
}
+ /** Submits a map stage to the scheduler and returns the job id. */
+ private def submitMapStage(
+ shuffleDep: ShuffleDependency[_, _, _],
+ listener: JobListener = jobListener): Int = {
+ val jobId = scheduler.nextJobId.getAndIncrement()
+ runEvent(MapStageSubmitted(jobId, shuffleDep, CallSite("", ""), listener))
+ jobId
+ }
+
/** Sends TaskSetFailed to the scheduler. */
private def failed(taskSet: TaskSet, message: String) {
runEvent(TaskSetFailed(taskSet, message, None))
@@ -1313,6 +1330,230 @@ class DAGSchedulerSuite
assert(stackTraceString.contains("org.scalatest.FunSuite"))
}
+ test("simple map stage submission") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+
+ // Submit a map stage by itself
+ submitMapStage(shuffleDep)
+ assert(results.size === 0) // No results yet
+ completeShuffleMapStageSuccessfully(0, 0, 1)
+ assert(results.size === 1)
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Submit a reduce job that depends on this map stage; it should directly do the reduce
+ submit(reduceRdd, Array(0))
+ completeNextResultStageWithSuccess(2, 0)
+ assert(results === Map(0 -> 42))
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Check that if we submit the map stage again, no tasks run
+ submitMapStage(shuffleDep)
+ assert(results.size === 1)
+ assertDataStructuresEmpty()
+ }
+
+ test("map stage submission with reduce stage also depending on the data") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(1))
+ val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+
+ // Submit the map stage by itself
+ submitMapStage(shuffleDep)
+
+ // Submit a reduce job that depends on this map stage
+ submit(reduceRdd, Array(0))
+
+ // Complete tasks for the map stage
+ completeShuffleMapStageSuccessfully(0, 0, 1)
+ assert(results.size === 1)
+ results.clear()
+
+ // Complete tasks for the reduce stage
+ completeNextResultStageWithSuccess(1, 0)
+ assert(results === Map(0 -> 42))
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Check that if we submit the map stage again, no tasks run
+ submitMapStage(shuffleDep)
+ assert(results.size === 1)
+ assertDataStructuresEmpty()
+ }
+
+ test("map stage submission with fetch failure") {
+ val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
+ val shuffleId = shuffleDep.shuffleId
+ val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+
+ // Submit a map stage by itself
+ submitMapStage(shuffleDep)
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+ (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+ assert(results.size === 1)
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Submit a reduce job that depends on this map stage, but where one reduce will fail a fetch
+ submit(reduceRdd, Array(0, 1))
+ complete(taskSets(1), Seq(
+ (Success, 42),
+ (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"), null)))
+ // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
+ // from, then TaskSet 3 will run the reduce stage
+ scheduler.resubmitFailedStages()
+ complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
+ complete(taskSets(3), Seq((Success, 43)))
+ assert(results === Map(0 -> 42, 1 -> 43))
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Run another reduce job without a failure; this should just work
+ submit(reduceRdd, Array(0, 1))
+ complete(taskSets(4), Seq(
+ (Success, 44),
+ (Success, 45)))
+ assert(results === Map(0 -> 44, 1 -> 45))
+ results.clear()
+ assertDataStructuresEmpty()
+
+ // Resubmit the map stage; this should also just work
+ submitMapStage(shuffleDep)
+ assert(results.size === 1)
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * In this test, we have three RDDs with shuffle dependencies, and we submit map stage jobs
+ * that are waiting on each one, as well as a reduce job on the last one. We test that all of
+ * these jobs complete even if there are some fetch failures in both shuffles.
+ */
+ test("map stage submission with multiple shared stages and failures") {
+ val rdd1 = new MyRDD(sc, 2, Nil)
+ val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2))
+ val rdd2 = new MyRDD(sc, 2, List(dep1))
+ val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2))
+ val rdd3 = new MyRDD(sc, 2, List(dep2))
+
+ val listener1 = new SimpleListener
+ val listener2 = new SimpleListener
+ val listener3 = new SimpleListener
+
+ submitMapStage(dep1, listener1)
+ submitMapStage(dep2, listener2)
+ submit(rdd3, Array(0, 1), listener = listener3)
+
+ // Complete the first stage
+ assert(taskSets(0).stageId === 0)
+ complete(taskSets(0), Seq(
+ (Success, makeMapStatus("hostA", rdd1.partitions.size)),
+ (Success, makeMapStatus("hostB", rdd1.partitions.size))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
+ assert(listener1.results.size === 1)
+
+ // When attempting the second stage, show a fetch failure
+ assert(taskSets(1).stageId === 1)
+ complete(taskSets(1), Seq(
+ (Success, makeMapStatus("hostA", rdd2.partitions.size)),
+ (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
+ scheduler.resubmitFailedStages()
+ assert(listener2.results.size === 0) // Second stage listener should not have a result yet
+
+ // Stage 0 should now be running as task set 2; make its task succeed
+ assert(taskSets(2).stageId === 0)
+ complete(taskSets(2), Seq(
+ (Success, makeMapStatus("hostC", rdd2.partitions.size))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+ assert(listener2.results.size === 0) // Second stage listener should still not have a result
+
+ // Stage 1 should now be running as task set 3; make its first task succeed
+ assert(taskSets(3).stageId === 1)
+ complete(taskSets(3), Seq(
+ (Success, makeMapStatus("hostB", rdd2.partitions.size)),
+ (Success, makeMapStatus("hostD", rdd2.partitions.size))))
+ assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
+ HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
+ assert(listener2.results.size === 1)
+
+ // Finally, the reduce job should be running as task set 4; make it see a fetch failure,
+ // then make it run again and succeed
+ assert(taskSets(4).stageId === 2)
+ complete(taskSets(4), Seq(
+ (Success, 52),
+ (FetchFailed(makeBlockManagerId("hostD"), dep2.shuffleId, 0, 0, "ignored"), null)))
+ scheduler.resubmitFailedStages()
+
+ // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
+ assert(taskSets(5).stageId === 1)
+ complete(taskSets(5), Seq(
+ (Success, makeMapStatus("hostE", rdd2.partitions.size))))
+ complete(taskSets(6), Seq(
+ (Success, 53)))
+ assert(listener3.results === Map(0 -> 52, 1 -> 53))
+ assertDataStructuresEmpty()
+ }
+
+ /**
+ * In this test, we run a map stage where one of the executors fails but we still receive a
+ * "zombie" complete message from that executor. We want to make sure the stage is not reported
+ * as done until all tasks have completed.
+ */
+ test("map stage submission with executor failure late map task completions") {
+ val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+ val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
+
+ submitMapStage(shuffleDep)
+
+ val oldTaskSet = taskSets(0)
+ runEvent(CompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2),
+ null, createFakeTaskInfo(), null))
+ assert(results.size === 0) // Map stage job should not be complete yet
+
+ // Pretend host A was lost
+ val oldEpoch = mapOutputTracker.getEpoch
+ runEvent(ExecutorLost("exec-hostA"))
+ val newEpoch = mapOutputTracker.getEpoch
+ assert(newEpoch > oldEpoch)
+
+ // Suppose we also get a completed event from task 1 on the same host; this should be ignored
+ runEvent(CompletionEvent(oldTaskSet.tasks(1), Success, makeMapStatus("hostA", 2),
+ null, createFakeTaskInfo(), null))
+ assert(results.size === 0) // Map stage job should not be complete yet
+
+ // A completion from another task should work because it's a non-failed host
+ runEvent(CompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2),
+ null, createFakeTaskInfo(), null))
+ assert(results.size === 0) // Map stage job should not be complete yet
+
+ // Now complete tasks in the second task set
+ val newTaskSet = taskSets(1)
+ assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on on hostA
+ runEvent(CompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2),
+ null, createFakeTaskInfo(), null))
+ assert(results.size === 0) // Map stage job should not be complete yet
+ runEvent(CompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2),
+ null, createFakeTaskInfo(), null))
+ assert(results.size === 1) // Map stage job should now finally be complete
+ assertDataStructuresEmpty()
+
+ // Also test that a reduce stage using this shuffled data can immediately run
+ val reduceRDD = new MyRDD(sc, 2, List(shuffleDep))
+ results.clear()
+ submit(reduceRDD, Array(0, 1))
+ complete(taskSets(2), Seq((Success, 42), (Success, 43)))
+ assert(results === Map(0 -> 42, 1 -> 43))
+ results.clear()
+ assertDataStructuresEmpty()
+ }
+
/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.