aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-02 16:26:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-02 16:26:24 -0800
commit2ebd1df3f17993f3cb472ec44c8832213976d99a (patch)
tree27369ea3bbc025e1c43f282fea96129db7d879d9 /core
parent9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff)
downloadspark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures. Author: Aaron Davidson <aaron@databricks.com> Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala6
6 files changed, 49 insertions, 23 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 7fb2b91377..e2f13accdf 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,7 +274,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
val blockTransferService =
- conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
+ conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
case "nio" =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ec3000e722..1c4327cf13 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
result.future
}
- override def close(): Unit = server.close()
+ override def close(): Unit = {
+ server.close()
+ clientFactory.close()
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1f8de28961..5f5dd0dc1c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
def stop(): Unit = {
blockTransferService.close()
+ if (shuffleClient ne blockTransferService) {
+ // Closing should be idempotent, but maybe not for the NioBlockTransferService.
+ shuffleClient.close()
+ }
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f0aa914cfe..66cf60d25f 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
-class ExecutorAllocationManagerSuite extends FunSuite {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._
@@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
intercept[SparkException] { new SparkContext(conf) }
+ SparkEnv.get.stop() // cleanup the created environment
// Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
intercept[SparkException] { new SparkContext(conf1) }
+ SparkEnv.get.stop()
// Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
intercept[SparkException] { new SparkContext(conf2) }
+ SparkEnv.get.stop()
// Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) }
+ SparkEnv.get.stop()
// Both min and max, and min == max
val sc1 = createSparkContext(1, 1)
@@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("starting state") {
- val sc = createSparkContext()
+ sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
assert(removeTimes(manager).isEmpty)
- sc.stop()
}
test("add executors") {
- val sc = createSparkContext(1, 10)
+ sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
// Keep adding until the limit is reached
@@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 1)
- sc.stop()
}
test("remove executors") {
- val sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
@@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(executorsPendingToRemove(manager).isEmpty)
assert(!removeExecutor(manager, "8"))
assert(executorsPendingToRemove(manager).isEmpty)
- sc.stop()
}
test ("interleaving add and remove") {
- val sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
// Add a few executors
@@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 10)
- sc.stop()
}
test("starting/canceling add timer") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val clock = new TestClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("starting/canceling remove timers") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val clock = new TestClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop with no events") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
val clock = new TestClock(2020L)
manager.setClock(clock)
@@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop add behavior") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop remove behavior") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger add executors correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)
@@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger remove executors correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)
@@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger add and remove executor callbacks correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd178d..d27880f4bc 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
-class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
+class MapOutputTrackerSuite extends FunSuite {
private val conf = new SparkConf
test("master start and stop") {
@@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop()
+ actorSystem.shutdown()
}
test("master register shuffle and fetch") {
@@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
+ actorSystem.shutdown()
}
test("master register and unregister shuffle") {
@@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.unregisterShuffle(10)
assert(!tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).isEmpty)
+
+ tracker.stop()
+ actorSystem.shutdown()
}
test("master register shuffle and unregister map output and fetch") {
@@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+
+ tracker.stop()
+ actorSystem.shutdown()
}
test("remote fetch") {
@@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ masterTracker.stop()
+ slaveTracker.stop()
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
}
test("remote fetch below akka frame size") {
@@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
+
+// masterTracker.stop() // this throws an exception
+ actorSystem.shutdown()
}
test("remote fetch exceeds akka frame size") {
@@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+
+// masterTracker.stop() // this throws an exception
+ actorSystem.shutdown()
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index df237ba796..0390a2e4f1 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
+import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
@@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
- extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
+ extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)