From 0cb2b82e0ef903dd99c589928bc17650037f25c5 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 14 Aug 2014 01:37:38 -0700 Subject: [SPARK-3029] Disable local execution of Spark jobs by default Currently, local execution of Spark jobs is only used by take(), and it can be problematic as it can load a significant amount of data onto the driver. The worst case scenarios occur if the RDD is cached (guaranteed to load whole partition), has very large elements, or the partition is just large and we apply a filter with high selectivity or computational overhead. Additionally, jobs that run locally in this manner do not show up in the web UI, and are thus harder to track or understand what is occurring. This PR adds a flag to disable local execution, which is turned OFF by default, with the intention of perhaps eventually removing this functionality altogether. Removing it now is a tougher proposition since it is part of the public runJob API. An alternative solution would be to limit the flag to take()/first() to avoid impacting any external users of this API, but such usage (or, at least, reliance upon the feature) is hopefully minimal. Author: Aaron Davidson Closes #1321 from aarondav/allowlocal and squashes the following commits: 136b253 [Aaron Davidson] Fix DAGSchedulerSuite 5599d55 [Aaron Davidson] [RFC] Disable local execution of Spark jobs by default (cherry picked from commit d069c5d9d2f6ce06389ca2ddf0b3ae4db72c5797) Signed-off-by: Reynold Xin --- .../src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 7 ++++++- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 4 +++- docs/configuration.md | 9 +++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 430e45ada5..36bbaaa3f1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -121,6 +121,9 @@ class DAGScheduler( private[scheduler] var eventProcessActor: ActorRef = _ + /** If enabled, we may run certain actions like take() and first() locally. */ + private val localExecutionEnabled = sc.getConf.getBoolean("spark.localExecution.enabled", false) + private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted @@ -732,7 +735,9 @@ class DAGScheduler( logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) - if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { + val shouldRunLocally = + localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 + if (shouldRunLocally) { // Compute very short actions like first() or take() with no parent stages locally. listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) runLocally(job) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 8c1b0fed11..bd829752eb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -141,7 +141,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } before { - sc = new SparkContext("local", "DAGSchedulerSuite") + // Enable local execution for this test + val conf = new SparkConf().set("spark.localExecution.enabled", "true") + sc = new SparkContext("local", "DAGSchedulerSuite", conf) sparkListener.successfulStages.clear() sparkListener.failedStages.clear() failure = null diff --git a/docs/configuration.md b/docs/configuration.md index c8336b3913..c408c468dc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -846,6 +846,15 @@ Apart from these, the following properties are also available, and may be useful (in milliseconds). + + spark.localExecution.enabled + false + + Enables Spark to run certain jobs, such as first() or take() on the driver, without sending + tasks to the cluster. This can make certain jobs execute very quickly, but may require + shipping a whole partition of data to the driver. + + #### Security -- cgit v1.2.3