diff options
author | Aaron Davidson <aaron@databricks.com> | 2014-11-02 16:26:24 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-11-02 16:26:24 -0800 |
commit | 2ebd1df3f17993f3cb472ec44c8832213976d99a (patch) | |
tree | 27369ea3bbc025e1c43f282fea96129db7d879d9 /network/common/src | |
parent | 9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff) | |
download | spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2 spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip |
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures.
Author: Aaron Davidson <aaron@databricks.com>
Closes #3053 from aarondav/leak and squashes the following commits:
e676d18 [Aaron Davidson] Typo!
8f96475 [Aaron Davidson] Keep original ssc semantics
7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'network/common/src')
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java | 3 | ||||
-rw-r--r-- | network/common/src/main/java/org/apache/spark/network/server/TransportServer.java | 5 |
2 files changed, 5 insertions, 3 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index e7fa4f6bf3..0b4a1d8286 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable { private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool; private final Class<? extends Channel> socketChannelClass; - private final EventLoopGroup workerGroup; + private EventLoopGroup workerGroup; public TransportClientFactory(TransportContext context) { this.context = context; @@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable { if (workerGroup != null) { workerGroup.shutdownGracefully(); + workerGroup = null; } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index d1a1877a98..70da48ca8e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -49,6 +49,7 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; + /** Creates a TransportServer that binds to the given port, or to any available if 0. */ public TransportServer(TransportContext context, int portToBind) { this.context = context; this.conf = context.getConf(); @@ -67,7 +68,7 @@ public class TransportServer implements Closeable { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = - NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); + NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() @@ -105,7 +106,7 @@ public class TransportServer implements Closeable { @Override public void close() { if (channelFuture != null) { - // close is a local operation and should finish with milliseconds; timeout just to be safe + // close is a local operation and should finish within milliseconds; timeout just to be safe channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); channelFuture = null; } |