aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-22 21:04:04 -0700
committerReynold Xin <rxin@databricks.com>2015-07-22 21:04:04 -0700
commitb217230f2a96c6d5a0554c593bdf1d1374878688 (patch)
tree88033457313049d3b079cc45a32bed8adc59737e /core
parentd71a13f475df2d05a7db9e25738d1353cbc8cfc7 (diff)
downloadspark-b217230f2a96c6d5a0554c593bdf1d1374878688.tar.gz
spark-b217230f2a96c6d5a0554c593bdf1d1374878688.tar.bz2
spark-b217230f2a96c6d5a0554c593bdf1d1374878688.zip
[SPARK-9144] Remove DAGScheduler.runLocallyWithinThread and spark.localExecution.enabled
Spark has an option called spark.localExecution.enabled; according to the docs: > Enables Spark to run certain jobs, such as first() or take() on the driver, without sending tasks to the cluster. This can make certain jobs execute very quickly, but may require shipping a whole partition of data to the driver. This feature ends up adding quite a bit of complexity to DAGScheduler, especially in the runLocallyWithinThread method, but as far as I know nobody uses this feature (I searched the mailing list and haven't seen any recent mentions of the configuration nor stacktraces including the runLocally method). As a step towards scheduler complexity reduction, I propose that we remove this feature and all code related to it for Spark 1.5. This pull request simply brings #7484 up to date. Author: Josh Rosen <joshrosen@databricks.com> Author: Reynold Xin <rxin@databricks.com> Closes #7585 from rxin/remove-local-exec and squashes the following commits: 84bd10e [Reynold Xin] Python fix. 1d9739a [Reynold Xin] Merge pull request #7484 from JoshRosen/remove-localexecution eec39fa [Josh Rosen] Remove allowLocal(); deprecate user-facing uses of it. b0835dc [Josh Rosen] Remove local execution code in DAGScheduler 8975d96 [Josh Rosen] Remove local execution tests. ffa8c9b [Josh Rosen] Remove documentation for configuration
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala117
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala83
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
13 files changed, 102 insertions, 211 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4976e5eb49..6a6b94a271 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1758,16 +1758,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
- * handler function. This is the main entry point for all actions in Spark. The allowLocal
- * flag specifies whether the scheduler can run the computation on the driver rather than
- * shipping it out to the cluster, for short actions like first().
+ * handler function. This is the main entry point for all actions in Spark.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit) {
+ resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
@@ -1777,54 +1774,104 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
- dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
- resultHandler, localProperties.get)
+ dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
/**
- * Run a function on a given set of partitions in an RDD and return the results as an array. The
- * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
- * than shipping it out to the cluster, for short actions like first().
+ * Run a function on a given set of partitions in an RDD and return the results as an array.
+ */
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int]): Array[U] = {
+ val results = new Array[U](partitions.size)
+ runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
+ results
+ }
+
+ /**
+ * Run a job on a given set of partitions of an RDD, but take a function of type
+ * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ */
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: Iterator[T] => U,
+ partitions: Seq[Int]): Array[U] = {
+ val cleanedFunc = clean(func)
+ runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
+ }
+
+
+ /**
+ * Run a function on a given set of partitions in an RDD and pass the results to the given
+ * handler function. This is the main entry point for all actions in Spark.
+ *
+ * The allowLocal flag is deprecated as of Spark 1.5.0+.
+ */
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int],
+ allowLocal: Boolean,
+ resultHandler: (Int, U) => Unit): Unit = {
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions, resultHandler)
+ }
+
+ /**
+ * Run a function on a given set of partitions in an RDD and return the results as an array.
+ *
+ * The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- val results = new Array[U](partitions.size)
- runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
- results
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions)
}
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ *
+ * The allowLocal argument is deprecated as of Spark 1.5.0+.
*/
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- val cleanedFunc = clean(func)
- runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
+ runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
+ runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
@@ -1835,7 +1882,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
{
- runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
+ runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
}
/**
@@ -1847,7 +1894,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
resultHandler: (Int, U) => Unit)
{
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
- runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
+ runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
}
/**
@@ -1892,7 +1939,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
- allowLocal = false,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c95615a5a9..829fae1d1d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -364,7 +364,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
// This is useful for implementing `take` from other language frontends
// like Python where the data is serialized.
import scala.collection.JavaConversions._
- val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
res.map(x => new java.util.ArrayList(x.toSeq)).toArray
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index dc9f62f39e..598953ac3b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -358,12 +358,11 @@ private[spark] object PythonRDD extends Logging {
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
- partitions: JArrayList[Int],
- allowLocal: Boolean): Int = {
+ partitions: JArrayList[Int]): Int = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
- sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
+ sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 66624ffbe4..581b40003c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -215,8 +215,6 @@ private[spark] class Executor(
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
} finally {
- // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
- // when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 91a6a2d039..326fafb230 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -881,7 +881,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
buf
} : Seq[V]
- val res = self.context.runJob(self, process, Array(index), false)
+ val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9f7ebae3e9..394c6686cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -897,7 +897,7 @@ abstract class RDD[T: ClassTag](
*/
def toLocalIterator: Iterator[T] = withScope {
def collectPartition(p: Int): Array[T] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
+ sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}
@@ -1273,7 +1273,7 @@ abstract class RDD[T: ClassTag](
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
+ val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 523aaf2b86..e277ae28d5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -50,8 +50,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L
prev.context.runJob(
prev,
Utils.getIteratorSize _,
- 0 until n - 1, // do not need to count the last partition
- allowLocal = false
+ 0 until n - 1 // do not need to count the last partition
).scanLeft(0L)(_ + _)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b829d06923..552dabcfa5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,6 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
-import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -128,10 +127,6 @@ class DAGScheduler(
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
-
- /** If enabled, we may run certain actions like take() and first() locally. */
- private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
-
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
@@ -515,7 +510,6 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
- allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
@@ -535,7 +529,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+ jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
@@ -545,11 +539,10 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
- allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
- val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
+ val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
@@ -576,8 +569,7 @@ class DAGScheduler(
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
- SerializationUtils.clone(properties)))
+ jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}
@@ -654,74 +646,6 @@ class DAGScheduler(
}
}
- /**
- * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
- * We run the operation in a separate thread just in case it takes a bunch of time, so that we
- * don't block the DAGScheduler event loop or other concurrent jobs.
- */
- protected def runLocally(job: ActiveJob) {
- logInfo("Computing the requested partition locally")
- new Thread("Local computation of job " + job.jobId) {
- override def run() {
- runLocallyWithinThread(job)
- }
- }.start()
- }
-
- // Broken out for easier testing in DAGSchedulerSuite.
- protected def runLocallyWithinThread(job: ActiveJob) {
- var jobResult: JobResult = JobSucceeded
- try {
- val rdd = job.finalStage.rdd
- val split = rdd.partitions(job.partitions(0))
- val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
- val taskContext =
- new TaskContextImpl(
- job.finalStage.id,
- job.partitions(0),
- taskAttemptId = 0,
- attemptNumber = 0,
- taskMemoryManager = taskMemoryManager,
- metricsSystem = env.metricsSystem,
- runningLocally = true)
- TaskContext.setTaskContext(taskContext)
- try {
- val result = job.func(taskContext, rdd.iterator(split, taskContext))
- job.listener.taskSucceeded(0, result)
- } finally {
- taskContext.markTaskCompleted()
- TaskContext.unset()
- // Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
- // make sure to update both copies.
- val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
- if (freedMemory > 0) {
- if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
- throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
- } else {
- logError(s"Managed memory leak detected; size = $freedMemory bytes")
- }
- }
- }
- } catch {
- case e: Exception =>
- val exception = new SparkDriverExecutionException(e)
- jobResult = JobFailed(exception)
- job.listener.jobFailed(exception)
- case oom: OutOfMemoryError =>
- val exception = new SparkException("Local job aborted due to out of memory error", oom)
- jobResult = JobFailed(exception)
- job.listener.jobFailed(exception)
- } finally {
- val s = job.finalStage
- // clean up data structures that were populated for a local job,
- // but that won't get cleaned up via the normal paths through
- // completion events or stage abort
- stageIdToStage -= s.id
- jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
- }
- }
-
/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
@@ -784,7 +708,6 @@ class DAGScheduler(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
- allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties) {
@@ -802,29 +725,20 @@ class DAGScheduler(
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
- logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
- job.jobId, callSite.shortForm, partitions.length, allowLocal))
+ logInfo("Got job %s (%s) with %d output partitions".format(
+ job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- val shouldRunLocally =
- localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
- if (shouldRunLocally) {
- // Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
- runLocally(job)
- } else {
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.resultOfJob = Some(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
- submitStage(finalStage)
- }
+ jobIdToActiveJob(jobId) = job
+ activeJobs += job
+ finalStage.resultOfJob = Some(job)
+ val stageIds = jobIdToStageIds(jobId).toArray
+ val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
+ submitStage(finalStage)
}
submitWaitingStages()
}
@@ -1486,9 +1400,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
- case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
- dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
- listener, properties)
+ case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
+ dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index a927eae2b0..a213d419cf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -40,7 +40,6 @@ private[scheduler] case class JobSubmitted(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
- allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index f6da9f98ad..5f718ea9f7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -679,7 +679,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
test("runJob on an invalid partition") {
intercept[IllegalArgumentException] {
- sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
+ sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3462a82c9c..86dff8fb57 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -153,9 +153,7 @@ class DAGSchedulerSuite
}
before {
- // Enable local execution for this test
- val conf = new SparkConf().set("spark.localExecution.enabled", "true")
- sc = new SparkContext("local", "DAGSchedulerSuite", conf)
+ sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
@@ -172,12 +170,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
- sc.env) {
- override def runLocally(job: ActiveJob) {
- // don't bother with the thread while unit testing
- runLocallyWithinThread(job)
- }
- }
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}
@@ -241,10 +234,9 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
- allowLocal: Boolean = false,
listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
jobId
}
@@ -284,37 +276,6 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
- test("local job") {
- val rdd = new PairOfIntsRDD(sc, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- Array(42 -> 0).iterator
- override def getPartitions: Array[Partition] =
- Array( new Partition { override def index: Int = 0 } )
- override def getPreferredLocations(split: Partition): List[String] = Nil
- override def toString: String = "DAGSchedulerSuite Local RDD"
- }
- val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(
- JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
- assert(results === Map(0 -> 42))
- assertDataStructuresEmpty()
- }
-
- test("local job oom") {
- val rdd = new PairOfIntsRDD(sc, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new java.lang.OutOfMemoryError("test local job oom")
- override def getPartitions = Array( new Partition { override def index = 0 } )
- override def getPreferredLocations(split: Partition) = Nil
- override def toString = "DAGSchedulerSuite Local RDD"
- }
- val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(
- JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
- assert(results.size == 0)
- assertDataStructuresEmpty()
- }
-
test("run trivial job w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
@@ -452,12 +413,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
- sc.env) {
- override def runLocally(job: ActiveJob) {
- // don't bother with the thread while unit testing
- runLocallyWithinThread(job)
- }
- }
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler)
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
cancel(jobId)
@@ -889,40 +845,23 @@ class DAGSchedulerSuite
// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
- // Run this within a local thread
- sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
-
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
- val e1 = intercept[SparkDriverExecutionException] {
- val rdd = sc.parallelize(1 to 10, 2)
- sc.runJob[Int, Int](
- rdd,
- (context: TaskContext, iter: Iterator[Int]) => iter.size,
- Seq(0),
- allowLocal = true,
- (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
- }
- assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
-
- val e2 = intercept[SparkDriverExecutionException] {
+ val e = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
Seq(0, 1),
- allowLocal = false,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
- assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+ assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
@@ -935,9 +874,8 @@ class DAGSchedulerSuite
rdd.reduceByKey(_ + _, 1).count()
}
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
@@ -951,9 +889,8 @@ class DAGSchedulerSuite
}
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("accumulator not calculated for resubmitted result stage") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index a9036da9cc..e5ecd4b7c2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -134,14 +134,14 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only one of two duplicate commit tasks should commit") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _,
- 0 until rdd.partitions.size, allowLocal = false)
+ 0 until rdd.partitions.size)
assert(tempDir.list().size === 1)
}
test("If commit fails, if task is retried it should not be locked, and will succeed.") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
- 0 until rdd.partitions.size, allowLocal = false)
+ 0 until rdd.partitions.size)
assert(tempDir.list().size === 1)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 651295b734..730535ece7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -188,7 +188,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
- sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
+ sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)