aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-04-02 19:48:55 -0700
committerAndrew Or <andrew@databricks.com>2015-04-02 19:50:45 -0700
commit0ef46b2d8de8dfb6da5a2b9ba808bdfe6d16e27d (patch)
treeb49d33e9c02c58c74648a4ac7a1d7f6331673a8a
parent2927af102fd6060c745196bbc6ab286760bad39d (diff)
downloadspark-0ef46b2d8de8dfb6da5a2b9ba808bdfe6d16e27d.tar.gz
spark-0ef46b2d8de8dfb6da5a2b9ba808bdfe6d16e27d.tar.bz2
spark-0ef46b2d8de8dfb6da5a2b9ba808bdfe6d16e27d.zip
[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
This fixes the thread leak. I also changed the unit test to keep track of allocated contexts and make sure they're closed after tests are run; this is needed since some tests use this pattern: val sc = createContext() doSomethingThatMayThrow() sc.stop() Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #5311 from vanzin/SPARK-6650 and squashes the following commits: 652c73b [Marcelo Vanzin] Nits. 5711512 [Marcelo Vanzin] More exception safety. cc5a744 [Marcelo Vanzin] Stop alloc manager before scheduler. 9886f69 [Marcelo Vanzin] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops. (cherry picked from commit 45134ec920c3766c22aefd4366b4b60ec99bd810) Signed-off-by: Andrew Or <andrew@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
-rw-r--r--core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala44
3 files changed, 48 insertions, 35 deletions
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 21c6e6ffa6..9385f557c4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,10 +17,12 @@
package org.apache.spark
+import java.util.concurrent.{Executors, TimeUnit}
+
import scala.collection.mutable
import org.apache.spark.scheduler._
-import org.apache.spark.util.{SystemClock, Clock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
+ // Executor that handles the scheduling task.
+ private val executor = Executors.newSingleThreadScheduledExecutor(
+ Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * Register for scheduler callbacks to decide when to add and remove executors.
+ * Register for scheduler callbacks to decide when to add and remove executors, and start
+ * the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
- startPolling()
+
+ val scheduleTask = new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions(schedule())
+ }
+ executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
/**
- * Start the main polling thread that keeps track of when to add and remove executors.
+ * Stop the allocation manager.
*/
- private def startPolling(): Unit = {
- val t = new Thread {
- override def run(): Unit = {
- while (true) {
- try {
- schedule()
- } catch {
- case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
- }
- Thread.sleep(intervalMillis)
- }
- }
- }
- t.setName("spark-dynamic-executor-allocation")
- t.setDaemon(true)
- t.start()
+ def stop(): Unit = {
+ executor.shutdown()
+ executor.awaitTermination(10, TimeUnit.SECONDS)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7c2de9d260..4aee423043 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1374,6 +1374,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
+ executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index abfcee7572..3ded1e4af8 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import scala.collection.mutable
-import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
-class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._
+ private val contexts = new mutable.ListBuffer[SparkContext]()
+
+ before {
+ contexts.clear()
+ }
+
+ after {
+ contexts.foreach(_.stop())
+ }
+
test("verify min/max executors") {
val conf = new SparkConf()
.setMaster("local")
@@ -39,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
val sc0 = new SparkContext(conf)
+ contexts += sc0
assert(sc0.executorAllocationManager.isDefined)
sc0.stop()
// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
- intercept[SparkException] { new SparkContext(conf1) }
+ intercept[SparkException] { contexts += new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
- intercept[SparkException] { new SparkContext(conf2) }
+ intercept[SparkException] { contexts += new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
@@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-2"))
assert(!removeTimes(manager).contains("executor-1"))
}
-}
-
-/**
- * Helper methods for testing ExecutorAllocationManager.
- * This includes methods to access private methods and fields in ExecutorAllocationManager.
- */
-private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
- private val schedulerBacklogTimeout = 1L
- private val sustainedSchedulerBacklogTimeout = 2L
- private val executorIdleTimeout = 3L
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
@@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
sustainedSchedulerBacklogTimeout.toString)
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
.set("spark.dynamicAllocation.testing", "true")
- new SparkContext(conf)
+ val sc = new SparkContext(conf)
+ contexts += sc
+ sc
}
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+ private val schedulerBacklogTimeout = 1L
+ private val sustainedSchedulerBacklogTimeout = 2L
+ private val executorIdleTimeout = 3L
+
private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
}