aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala86
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala117
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala83
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
-rw-r--r--docs/configuration.md9
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala3
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala2
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/rdd.py4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala3
19 files changed, 108 insertions, 229 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4976e5eb49..6a6b94a271 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1758,16 +1758,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
- * handler function. This is the main entry point for all actions in Spark. The allowLocal
- * flag specifies whether the scheduler can run the computation on the driver rather than
- * shipping it out to the cluster, for short actions like first().
+ * handler function. This is the main entry point for all actions in Spark.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit) {
+ resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
@@ -1777,54 +1774,104 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
- dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
- resultHandler, localProperties.get)
+ dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
/**
- * Run a function on a given set of partitions in an RDD and return the results as an array. The
- * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
- * than shipping it out to the cluster, for short actions like first().
+ * Run a function on a given set of partitions in an RDD and return the results as an array.
+ */
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int]): Array[U] = {
+ val results = new Array[U](partitions.size)
+ runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
+ results
+ }
+
+ /**
+ * Run a job on a given set of partitions of an RDD, but take a function of type
+ * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ */
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: Iterator[T] => U,
+ partitions: Seq[Int]): Array[U] = {
+ val cleanedFunc = clean(func)
+ runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
+ }
+
+
+ /**
+ * Run a function on a given set of partitions in an RDD and pass the results to the given
+ * handler function. This is the main entry point for all actions in Spark.
+ *
+ * The allowLocal flag is deprecated as of Spark 1.5.0+.
+ */
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
+ def runJob[T, U: ClassTag](
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ partitions: Seq[Int],
+ allowLocal: Boolean,
+ resultHandler: (Int, U) => Unit): Unit = {
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions, resultHandler)
+ }
+
+ /**
+ * Run a function on a given set of partitions in an RDD and return the results as an array.
+ *
+ * The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- val results = new Array[U](partitions.size)
- runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
- results
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions)
}
/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ *
+ * The allowLocal argument is deprecated as of Spark 1.5.0+.
*/
+ @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
- val cleanedFunc = clean(func)
- runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
+ if (allowLocal) {
+ logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
+ }
+ runJob(rdd, func, partitions)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
+ runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
+ runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
@@ -1835,7 +1882,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
{
- runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
+ runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
}
/**
@@ -1847,7 +1894,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
resultHandler: (Int, U) => Unit)
{
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
- runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
+ runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
}
/**
@@ -1892,7 +1939,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
- allowLocal = false,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index c95615a5a9..829fae1d1d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -364,7 +364,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
// This is useful for implementing `take` from other language frontends
// like Python where the data is serialized.
import scala.collection.JavaConversions._
- val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
+ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
res.map(x => new java.util.ArrayList(x.toSeq)).toArray
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index dc9f62f39e..598953ac3b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -358,12 +358,11 @@ private[spark] object PythonRDD extends Logging {
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
- partitions: JArrayList[Int],
- allowLocal: Boolean): Int = {
+ partitions: JArrayList[Int]): Int = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
- sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
+ sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 66624ffbe4..581b40003c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -215,8 +215,6 @@ private[spark] class Executor(
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
} finally {
- // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
- // when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 91a6a2d039..326fafb230 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -881,7 +881,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
buf
} : Seq[V]
- val res = self.context.runJob(self, process, Array(index), false)
+ val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9f7ebae3e9..394c6686cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -897,7 +897,7 @@ abstract class RDD[T: ClassTag](
*/
def toLocalIterator: Iterator[T] = withScope {
def collectPartition(p: Int): Array[T] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
+ sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}
@@ -1273,7 +1273,7 @@ abstract class RDD[T: ClassTag](
val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
+ val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
index 523aaf2b86..e277ae28d5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
@@ -50,8 +50,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L
prev.context.runJob(
prev,
Utils.getIteratorSize _,
- 0 until n - 1, // do not need to count the last partition
- allowLocal = false
+ 0 until n - 1 // do not need to count the last partition
).scanLeft(0L)(_ + _)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b829d06923..552dabcfa5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,6 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
-import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -128,10 +127,6 @@ class DAGScheduler(
// This is only safe because DAGScheduler runs in a single thread.
private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
-
- /** If enabled, we may run certain actions like take() and first() locally. */
- private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
-
/** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
@@ -515,7 +510,6 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
- allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
@@ -535,7 +529,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
+ jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
@@ -545,11 +539,10 @@ class DAGScheduler(
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
- allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
- val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
+ val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
@@ -576,8 +569,7 @@ class DAGScheduler(
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventProcessLoop.post(JobSubmitted(
- jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
- SerializationUtils.clone(properties)))
+ jobId, rdd, func2, partitions, callSite, listener, SerializationUtils.clone(properties)))
listener.awaitResult() // Will throw an exception if the job fails
}
@@ -654,74 +646,6 @@ class DAGScheduler(
}
}
- /**
- * Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
- * We run the operation in a separate thread just in case it takes a bunch of time, so that we
- * don't block the DAGScheduler event loop or other concurrent jobs.
- */
- protected def runLocally(job: ActiveJob) {
- logInfo("Computing the requested partition locally")
- new Thread("Local computation of job " + job.jobId) {
- override def run() {
- runLocallyWithinThread(job)
- }
- }.start()
- }
-
- // Broken out for easier testing in DAGSchedulerSuite.
- protected def runLocallyWithinThread(job: ActiveJob) {
- var jobResult: JobResult = JobSucceeded
- try {
- val rdd = job.finalStage.rdd
- val split = rdd.partitions(job.partitions(0))
- val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
- val taskContext =
- new TaskContextImpl(
- job.finalStage.id,
- job.partitions(0),
- taskAttemptId = 0,
- attemptNumber = 0,
- taskMemoryManager = taskMemoryManager,
- metricsSystem = env.metricsSystem,
- runningLocally = true)
- TaskContext.setTaskContext(taskContext)
- try {
- val result = job.func(taskContext, rdd.iterator(split, taskContext))
- job.listener.taskSucceeded(0, result)
- } finally {
- taskContext.markTaskCompleted()
- TaskContext.unset()
- // Note: this memory freeing logic is duplicated in Executor.run(); when changing this,
- // make sure to update both copies.
- val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
- if (freedMemory > 0) {
- if (sc.getConf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
- throw new SparkException(s"Managed memory leak detected; size = $freedMemory bytes")
- } else {
- logError(s"Managed memory leak detected; size = $freedMemory bytes")
- }
- }
- }
- } catch {
- case e: Exception =>
- val exception = new SparkDriverExecutionException(e)
- jobResult = JobFailed(exception)
- job.listener.jobFailed(exception)
- case oom: OutOfMemoryError =>
- val exception = new SparkException("Local job aborted due to out of memory error", oom)
- jobResult = JobFailed(exception)
- job.listener.jobFailed(exception)
- } finally {
- val s = job.finalStage
- // clean up data structures that were populated for a local job,
- // but that won't get cleaned up via the normal paths through
- // completion events or stage abort
- stageIdToStage -= s.id
- jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), jobResult))
- }
- }
-
/** Finds the earliest-created active job that needs the stage */
// TODO: Probably should actually find among the active jobs that need this
// stage the one with the highest priority (highest-priority pool, earliest created).
@@ -784,7 +708,6 @@ class DAGScheduler(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
- allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties) {
@@ -802,29 +725,20 @@ class DAGScheduler(
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
- logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
- job.jobId, callSite.shortForm, partitions.length, allowLocal))
+ logInfo("Got job %s (%s) with %d output partitions".format(
+ job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- val shouldRunLocally =
- localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
val jobSubmissionTime = clock.getTimeMillis()
- if (shouldRunLocally) {
- // Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))
- runLocally(job)
- } else {
- jobIdToActiveJob(jobId) = job
- activeJobs += job
- finalStage.resultOfJob = Some(job)
- val stageIds = jobIdToStageIds(jobId).toArray
- val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
- listenerBus.post(
- SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
- submitStage(finalStage)
- }
+ jobIdToActiveJob(jobId) = job
+ activeJobs += job
+ finalStage.resultOfJob = Some(job)
+ val stageIds = jobIdToStageIds(jobId).toArray
+ val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
+ submitStage(finalStage)
}
submitWaitingStages()
}
@@ -1486,9 +1400,8 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
- case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
- dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
- listener, properties)
+ case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
+ dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index a927eae2b0..a213d419cf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -40,7 +40,6 @@ private[scheduler] case class JobSubmitted(
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
- allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index f6da9f98ad..5f718ea9f7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -679,7 +679,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
test("runJob on an invalid partition") {
intercept[IllegalArgumentException] {
- sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
+ sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3462a82c9c..86dff8fb57 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -153,9 +153,7 @@ class DAGSchedulerSuite
}
before {
- // Enable local execution for this test
- val conf = new SparkConf().set("spark.localExecution.enabled", "true")
- sc = new SparkContext("local", "DAGSchedulerSuite", conf)
+ sc = new SparkContext("local", "DAGSchedulerSuite")
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
@@ -172,12 +170,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
- sc.env) {
- override def runLocally(job: ActiveJob) {
- // don't bother with the thread while unit testing
- runLocallyWithinThread(job)
- }
- }
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(scheduler)
}
@@ -241,10 +234,9 @@ class DAGSchedulerSuite
rdd: RDD[_],
partitions: Array[Int],
func: (TaskContext, Iterator[_]) => _ = jobComputeFunc,
- allowLocal: Boolean = false,
listener: JobListener = jobListener): Int = {
val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener))
+ runEvent(JobSubmitted(jobId, rdd, func, partitions, CallSite("", ""), listener))
jobId
}
@@ -284,37 +276,6 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}
- test("local job") {
- val rdd = new PairOfIntsRDD(sc, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- Array(42 -> 0).iterator
- override def getPartitions: Array[Partition] =
- Array( new Partition { override def index: Int = 0 } )
- override def getPreferredLocations(split: Partition): List[String] = Nil
- override def toString: String = "DAGSchedulerSuite Local RDD"
- }
- val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(
- JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
- assert(results === Map(0 -> 42))
- assertDataStructuresEmpty()
- }
-
- test("local job oom") {
- val rdd = new PairOfIntsRDD(sc, Nil) {
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
- throw new java.lang.OutOfMemoryError("test local job oom")
- override def getPartitions = Array( new Partition { override def index = 0 } )
- override def getPreferredLocations(split: Partition) = Nil
- override def toString = "DAGSchedulerSuite Local RDD"
- }
- val jobId = scheduler.nextJobId.getAndIncrement()
- runEvent(
- JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
- assert(results.size == 0)
- assertDataStructuresEmpty()
- }
-
test("run trivial job w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
@@ -452,12 +413,7 @@ class DAGSchedulerSuite
sc.listenerBus,
mapOutputTracker,
blockManagerMaster,
- sc.env) {
- override def runLocally(job: ActiveJob) {
- // don't bother with the thread while unit testing
- runLocallyWithinThread(job)
- }
- }
+ sc.env)
dagEventProcessLoopTester = new DAGSchedulerEventProcessLoopTester(noKillScheduler)
val jobId = submit(new MyRDD(sc, 1, Nil), Array(0))
cancel(jobId)
@@ -889,40 +845,23 @@ class DAGSchedulerSuite
// Run this on executors
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
- // Run this within a local thread
- sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
-
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
- val e1 = intercept[SparkDriverExecutionException] {
- val rdd = sc.parallelize(1 to 10, 2)
- sc.runJob[Int, Int](
- rdd,
- (context: TaskContext, iter: Iterator[Int]) => iter.size,
- Seq(0),
- allowLocal = true,
- (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
- }
- assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
-
- val e2 = intercept[SparkDriverExecutionException] {
+ val e = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
Seq(0, 1),
- allowLocal = false,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
- assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+ assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
@@ -935,9 +874,8 @@ class DAGSchedulerSuite
rdd.reduceByKey(_ + _, 1).count()
}
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
@@ -951,9 +889,8 @@ class DAGSchedulerSuite
}
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
- // Make sure we can still run local commands as well as cluster commands.
+ // Make sure we can still run commands
assert(sc.parallelize(1 to 10, 2).count() === 10)
- assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("accumulator not calculated for resubmitted result stage") {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index a9036da9cc..e5ecd4b7c2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -134,14 +134,14 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
test("Only one of two duplicate commit tasks should commit") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).commitSuccessfully _,
- 0 until rdd.partitions.size, allowLocal = false)
+ 0 until rdd.partitions.size)
assert(tempDir.list().size === 1)
}
test("If commit fails, if task is retried it should not be locked, and will succeed.") {
val rdd = sc.parallelize(Seq(1), 1)
sc.runJob(rdd, OutputCommitFunctions(tempDir.getAbsolutePath).failFirstCommitAttempt _,
- 0 until rdd.partitions.size, allowLocal = false)
+ 0 until rdd.partitions.size)
assert(tempDir.list().size === 1)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 651295b734..730535ece7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -188,7 +188,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
sc.addSparkListener(listener)
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
- sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1), true)
+ sc.runJob(rdd2, (items: Iterator[String]) => items.size, Seq(0, 1))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
diff --git a/docs/configuration.md b/docs/configuration.md
index fea259204a..200f3cd212 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1049,15 +1049,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.localExecution.enabled</code></td>
- <td>false</td>
- <td>
- Enables Spark to run certain jobs, such as first() or take() on the driver, without sending
- tasks to the cluster. This can make certain jobs execute very quickly, but may require
- shipping a whole partition of data to the driver.
- </td>
-</tr>
-<tr>
<td><code>spark.locality.wait</code></td>
<td>3s</td>
<td>
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index c5cd215477..1a9d78c0d4 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -98,8 +98,7 @@ class KafkaRDD[
val res = context.runJob(
this,
(tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray,
- parts.keys.toArray,
- allowLocal = true)
+ parts.keys.toArray)
res.foreach(buf ++= _)
buf.toArray
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
index 35e81fcb3d..1facf83d80 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/SlidingRDD.scala
@@ -72,7 +72,7 @@ class SlidingRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int
val w1 = windowSize - 1
// Get the first w1 items of each partition, starting from the second partition.
val nextHeads =
- parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n, true)
+ parent.context.runJob(parent, (iter: Iterator[T]) => iter.take(w1).toArray, 1 until n)
val partitions = mutable.ArrayBuffer[SlidingRDDPartition[T]]()
var i = 0
var partitionIndex = 0
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 43bde5ae41..eb5b0bbbda 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -913,8 +913,7 @@ class SparkContext(object):
# by runJob() in order to avoid having to pass a Python lambda into
# SparkContext#runJob.
mappedRDD = rdd.mapPartitions(partitionFunc)
- port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions,
- allowLocal)
+ port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
def show_profiles(self):
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7e788148d9..fa8e0a0574 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1293,7 +1293,7 @@ class RDD(object):
taken += 1
p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
- res = self.context.runJob(self, takeUpToNumLeft, p, True)
+ res = self.context.runJob(self, takeUpToNumLeft, p)
items += res
partsScanned += numPartsToTry
@@ -2193,7 +2193,7 @@ class RDD(object):
values = self.filter(lambda kv: kv[0] == key).values()
if self.partitioner is not None:
- return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)], False)
+ return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)])
return values.collect()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b0d56b7bf0..50c27def8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -165,8 +165,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val sc = sqlContext.sparkContext
val res =
- sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p,
- allowLocal = false)
+ sc.runJob(childRDD, (it: Iterator[InternalRow]) => it.take(left).toArray, p)
res.foreach(buf ++= _.take(n - buf.size))
partsScanned += numPartsToTry