aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-08-14 01:37:38 -0700
committerReynold Xin <rxin@apache.org>2014-08-14 01:37:45 -0700
commit0cb2b82e0ef903dd99c589928bc17650037f25c5 (patch)
tree450c915a5f697f9979b836c7ac58a8c61133fd93
parent1baf06f4e6a2c4767ad6107559396c7680085235 (diff)
downloadspark-0cb2b82e0ef903dd99c589928bc17650037f25c5.tar.gz
spark-0cb2b82e0ef903dd99c589928bc17650037f25c5.tar.bz2
spark-0cb2b82e0ef903dd99c589928bc17650037f25c5.zip
[SPARK-3029] Disable local execution of Spark jobs by default
Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead. Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring. This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal. Author: Aaron Davidson <aaron@databricks.com> Closes #1321 from aarondav/allowlocal and squashes the following commits: 136b253 [Aaron Davidson] Fix DAGSchedulerSuite 5599d55 [Aaron Davidson] [RFC] Disable local execution of Spark jobs by default (cherry picked from commit d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797) Signed-off-by: Reynold Xin <rxin@apache.org>
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--docs/configuration.md9
3 files changed, 18 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 430e45ada5..36bbaaa3f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -121,6 +121,9 @@ class DAGScheduler(
private[scheduler] var eventProcessActor: ActorRef = _
+ /** If enabled, we may run certain actions like take() and first() locally. */
+ private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
+
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
@@ -732,7 +735,9 @@ class DAGScheduler(
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+ val shouldRunLocally =
+ localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
+ if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8c1b0fed11..bd829752eb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -141,7 +141,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
before {
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ // Enable local execution for this test
+ val conf = new SparkConf().set("spark.localExecution.enabled", "true")
+ sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null
diff --git a/docs/configuration.md b/docs/configuration.md
index c8336b3913..c408c468dc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -846,6 +846,15 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds).
</td>
</tr>
+<tr>
+ <td><code>spark.localExecution.enabled</code></td>
+ <td>false</td>
+ <td>
+ Enables Spark to run certain jobs, such as first() or take() on the driver, without sending
+ tasks to the cluster. This can make certain jobs execute very quickly, but may require
+ shipping a whole partition of data to the driver.
+ </td>
+</tr>
</table>
#### Security