aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-08-14 01:37:38 -0700
committerReynold Xin <rxin@apache.org>2014-08-14 01:37:38 -0700
commitd069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797 (patch)
treeb6bd62749f63cd924073ed4d6e2b59324307b093 /core
parent69a57a18ee35af1cc5a00b67a80837ea317cd330 (diff)
downloadspark-d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797.tar.gz
spark-d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797.tar.bz2
spark-d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797.zip
[SPARK-3029] Disable local execution of Spark jobs by default
Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead. Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring. This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal. Author: Aaron Davidson <aaron@databricks.com> Closes #1321 from aarondav/allowlocal and squashes the following commits: 136b253 [Aaron Davidson] Fix DAGSchedulerSuite 5599d55 [Aaron Davidson] [RFC] Disable local execution of Spark jobs by default
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
2 files changed, 9 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 430e45ada5..36bbaaa3f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -121,6 +121,9 @@ class DAGScheduler(
private[scheduler] var eventProcessActor: ActorRef = _
+ /** If enabled, we may run certain actions like take() and first() locally. */
+ private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false)
+
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
@@ -732,7 +735,9 @@ class DAGScheduler(
logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
- if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
+ val shouldRunLocally =
+ localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
+ if (shouldRunLocally) {
// Compute very short actions like first() or take() with no parent stages locally.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8c1b0fed11..bd829752eb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -141,7 +141,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
}
before {
- sc = new SparkContext("local", "DAGSchedulerSuite")
+ // Enable local execution for this test
+ val conf = new SparkConf().set("spark.localExecution.enabled", "true")
+ sc = new SparkContext("local", "DAGSchedulerSuite", conf)
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
failure = null