aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--docs/configuration.md9
3 files changed, 18 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 430e45ada5..36bbaaa3f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -121,6 +121,9 @@ class DAGScheduler(
private[scheduler] var eventProcessActor: ActorRef = _
+ /** If enabled, we may run certain actions like take() and first() locally. */
+ private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
+
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
@@ -732,7 +735,9 @@ class DAGScheduler(
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+ val shouldRunLocally =
+ localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
+ if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8c1b0fed11..bd829752eb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -141,7 +141,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
before {
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ // Enable local execution for this test
+ val conf = new SparkConf().set("spark.localExecution.enabled", "true")
+ sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
diff --git a/docs/configuration.md b/docs/configuration.md
index c8336b3913..c408c468dc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -846,6 +846,15 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds).
</td>
</tr>
+<tr>
+ <td><code>spark.localExecution.enabled</code></td>
+ <td>false</td>
+ <td>
+ Enables Spark to run certain jobs, such as first() or take() on the driver, without sending
+ tasks to the cluster. This can make certain jobs execute very quickly, but may require
+ shipping a whole partition of data to the driver.
+ </td>
+</tr>
</table>
#### Security