diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-02 16:26:24 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-02 16:26:24 -0800 |
commit | 2ebd1df3f17993f3cb472ec44c8832213976d99a (patch) | |
tree | 27369ea3bbc025e1c43f282fea96129db7d879d9 /core | |
parent | 9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff) | |
download | spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2 spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip |
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures.
Author: Aaron Davidson <aaron@databricks.com>
Closes #3053 from aarondav/leak and squashes the following commits:
e676d18 [Aaron Davidson] Typo!
8f96475 [Aaron Davidson] Keep original ssc semantics
7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'core')
6 files changed, 49 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 7fb2b91377..e2f13accdf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -274,7 +274,7 @@ object SparkEnv extends Logging { val shuffleMemoryManager = new ShuffleMemoryManager(conf) val blockTransferService = - conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match { + conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match { case "netty" => new NettyBlockTransferService(conf) case "nio" => diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index ec3000e722..1c4327cf13 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService { result.future } - override def close(): Unit = server.close() + override def close(): Unit = { + server.close() + clientFactory.close() + } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1f8de28961..5f5dd0dc1c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1178,6 +1178,10 @@ private[spark] class BlockManager( def stop(): Unit = { blockTransferService.close() + if (shuffleClient ne blockTransferService) { + // Closing should be idempotent, but maybe not for the NioBlockTransferService. + shuffleClient.close() + } diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index f0aa914cfe..66cf60d25f 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId /** * Test add and remove behavior of ExecutorAllocationManager. */ -class ExecutorAllocationManagerSuite extends FunSuite { +class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ @@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite { .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") intercept[SparkException] { new SparkContext(conf) } + SparkEnv.get.stop() // cleanup the created environment // Only min val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") intercept[SparkException] { new SparkContext(conf1) } + SparkEnv.get.stop() // Only max val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") intercept[SparkException] { new SparkContext(conf2) } + SparkEnv.get.stop() // Both min and max, but min > max intercept[SparkException] { createSparkContext(2, 1) } + SparkEnv.get.stop() // Both min and max, and min == max val sc1 = createSparkContext(1, 1) @@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("starting state") { - val sc = createSparkContext() + sc = createSparkContext() val manager = sc.executorAllocationManager.get assert(numExecutorsPending(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) assert(executorIds(manager).isEmpty) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) assert(removeTimes(manager).isEmpty) - sc.stop() } test("add executors") { - val sc = createSparkContext(1, 10) + sc = createSparkContext(1, 10) val manager = sc.executorAllocationManager.get // Keep adding until the limit is reached @@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { assert(addExecutors(manager) === 0) assert(numExecutorsPending(manager) === 6) assert(numExecutorsToAdd(manager) === 1) - sc.stop() } test("remove executors") { - val sc = createSparkContext(5, 10) + sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) } @@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { assert(executorsPendingToRemove(manager).isEmpty) assert(!removeExecutor(manager, "8")) assert(executorsPendingToRemove(manager).isEmpty) - sc.stop() } test ("interleaving add and remove") { - val sc = createSparkContext(5, 10) + sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get // Add a few executors @@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { onExecutorAdded(manager, "15") onExecutorAdded(manager, "16") assert(executorIds(manager).size === 10) - sc.stop() } test("starting/canceling add timer") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val clock = new TestClock(8888L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("starting/canceling remove timers") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val clock = new TestClock(14444L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop with no events") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val manager = sc.executorAllocationManager.get val clock = new TestClock(2020L) manager.setClock(clock) @@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop add behavior") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop remove behavior") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger add executors correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(addTime(manager) === NOT_SET) @@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger remove executors correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(removeTimes(manager).isEmpty) @@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger add and remove executor callbacks correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index cbc0bd178d..d27880f4bc 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils -class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { +class MapOutputTrackerSuite extends FunSuite { private val conf = new SparkConf test("master start and stop") { @@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.stop() + actorSystem.shutdown() } test("master register shuffle and fetch") { @@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), (BlockManagerId("b", "hostB", 1000), size10000))) tracker.stop() + actorSystem.shutdown() } test("master register and unregister shuffle") { @@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.unregisterShuffle(10) assert(!tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).isEmpty) + + tracker.stop() + actorSystem.shutdown() } test("master register shuffle and unregister map output and fetch") { @@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } + + tracker.stop() + actorSystem.shutdown() } test("remote fetch") { @@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // failure should be cached intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + masterTracker.stop() + slaveTracker.stop() + actorSystem.shutdown() + slaveSystem.shutdown() } test("remote fetch below akka frame size") { @@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.registerMapOutput(10, 0, MapStatus( BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) masterActor.receive(GetMapOutputStatuses(10)) + +// masterTracker.stop() // this throws an exception + actorSystem.shutdown() } test("remote fetch exceeds akka frame size") { @@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + +// masterTracker.stop() // this throws an exception + actorSystem.shutdown() } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index df237ba796..0390a2e4f1 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark -import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester} +import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} @@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend class SparkContextSchedulerCreationSuite - extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach { + extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging { def createTaskScheduler(master: String): TaskSchedulerImpl = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. - val sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test") val createTaskSchedulerMethod = PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master) |