aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-02 16:26:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-02 16:26:24 -0800
commit2ebd1df3f17993f3cb472ec44c8832213976d99a (patch)
tree27369ea3bbc025e1c43f282fea96129db7d879d9
parent9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff)
downloadspark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures. Author: Aaron Davidson <aaron@databricks.com> Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala34
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala21
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala6
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala15
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java3
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java5
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java7
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala4
12 files changed, 78 insertions, 32 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 7fb2b91377..e2f13accdf 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -274,7 +274,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
val blockTransferService =
- conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
+ conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf)
case "nio" =>
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index ec3000e722..1c4327cf13 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
result.future
}
- override def close(): Unit = server.close()
+ override def close(): Unit = {
+ server.close()
+ clientFactory.close()
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1f8de28961..5f5dd0dc1c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
def stop(): Unit = {
blockTransferService.close()
+ if (shuffleClient ne blockTransferService) {
+ // Closing should be idempotent, but maybe not for the NioBlockTransferService.
+ shuffleClient.close()
+ }
diskBlockManager.stop()
actorSystem.stop(slaveActor)
blockInfo.clear()
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f0aa914cfe..66cf60d25f 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
-class ExecutorAllocationManagerSuite extends FunSuite {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._
@@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
intercept[SparkException] { new SparkContext(conf) }
+ SparkEnv.get.stop() // cleanup the created environment
// Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
intercept[SparkException] { new SparkContext(conf1) }
+ SparkEnv.get.stop()
// Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
intercept[SparkException] { new SparkContext(conf2) }
+ SparkEnv.get.stop()
// Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) }
+ SparkEnv.get.stop()
// Both min and max, and min == max
val sc1 = createSparkContext(1, 1)
@@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("starting state") {
- val sc = createSparkContext()
+ sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
assert(removeTimes(manager).isEmpty)
- sc.stop()
}
test("add executors") {
- val sc = createSparkContext(1, 10)
+ sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get
// Keep adding until the limit is reached
@@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 1)
- sc.stop()
}
test("remove executors") {
- val sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
@@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(executorsPendingToRemove(manager).isEmpty)
assert(!removeExecutor(manager, "8"))
assert(executorsPendingToRemove(manager).isEmpty)
- sc.stop()
}
test ("interleaving add and remove") {
- val sc = createSparkContext(5, 10)
+ sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get
// Add a few executors
@@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 10)
- sc.stop()
}
test("starting/canceling add timer") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val clock = new TestClock(8888L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("starting/canceling remove timers") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val clock = new TestClock(14444L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop with no events") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get
val clock = new TestClock(2020L)
manager.setClock(clock)
@@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop add behavior") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("mock polling loop remove behavior") {
- val sc = createSparkContext(1, 20)
+ sc = createSparkContext(1, 20)
val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get
manager.setClock(clock)
@@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger add executors correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET)
@@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger remove executors correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty)
@@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
}
test("listeners trigger add and remove executor callbacks correctly") {
- val sc = createSparkContext(2, 10)
+ sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index cbc0bd178d..d27880f4bc 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
-class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
+class MapOutputTrackerSuite extends FunSuite {
private val conf = new SparkConf
test("master start and stop") {
@@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop()
+ actorSystem.shutdown()
}
test("master register shuffle and fetch") {
@@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop()
+ actorSystem.shutdown()
}
test("master register and unregister shuffle") {
@@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.unregisterShuffle(10)
assert(!tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).isEmpty)
+
+ tracker.stop()
+ actorSystem.shutdown()
}
test("master register shuffle and unregister map output and fetch") {
@@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
+
+ tracker.stop()
+ actorSystem.shutdown()
}
test("remote fetch") {
@@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
+
+ masterTracker.stop()
+ slaveTracker.stop()
+ actorSystem.shutdown()
+ slaveSystem.shutdown()
}
test("remote fetch below akka frame size") {
@@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10))
+
+// masterTracker.stop() // this throws an exception
+ actorSystem.shutdown()
}
test("remote fetch exceeds akka frame size") {
@@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
+
+// masterTracker.stop() // this throws an exception
+ actorSystem.shutdown()
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index df237ba796..0390a2e4f1 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark
-import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
+import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
@@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite
- extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach {
+ extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
- val sc = new SparkContext("local", "test")
+ sc = new SparkContext("local", "test")
val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 32a19787a2..475026e8eb 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
outputStream.register()
ssc.start()
- writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
- assertChannelIsEmpty(channel)
- assertChannelIsEmpty(channel2)
- sink.stop()
- channel.stop()
+ try {
+ writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
+ assertChannelIsEmpty(channel)
+ assertChannelIsEmpty(channel2)
+ } finally {
+ sink.stop()
+ sink2.stop()
+ channel.stop()
+ channel2.stop()
+ }
}
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e7fa4f6bf3..0b4a1d8286 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
private final Class<? extends Channel> socketChannelClass;
- private final EventLoopGroup workerGroup;
+ private EventLoopGroup workerGroup;
public TransportClientFactory(TransportContext context) {
this.context = context;
@@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
+ workerGroup = null;
}
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d1a1877a98..70da48ca8e 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;
+ /** Creates a TransportServer that binds to the given port, or to any available if 0. */
public TransportServer(TransportContext context, int portToBind) {
this.context = context;
this.conf = context.getConf();
@@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+ NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
@Override
public void close() {
if (channelFuture != null) {
- // close is a local operation and should finish with milliseconds; timeout just to be safe
+ // close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index cc2f6261ca..6bbabc44b9 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,6 +17,8 @@
package org.apache.spark.network.shuffle;
+import java.io.Closeable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient {
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
}
+
+ @Override
+ public void close() {
+ clientFactory.close();
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 9fa87c2c6e..d46a562394 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -17,8 +17,10 @@
package org.apache.spark.network.shuffle;
+import java.io.Closeable;
+
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
-public interface ShuffleClient {
+public interface ShuffleClient extends Closeable {
/**
* Fetch a sequence of blocks from a remote node asynchronously,
*
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 655cec1573..f47772947d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after {
if (ssc != null) {
ssc.stop()
+ if (ssc.sc != null) {
+ // Calling ssc.stop() does not always stop the associated SparkContext.
+ ssc.sc.stop()
+ }
ssc = null
}
if (sc != null) {