aboutsummaryrefslogtreecommitdiff
path: root/network/common
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-02 16:26:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-02 16:26:24 -0800
commit2ebd1df3f17993f3cb472ec44c8832213976d99a (patch)
tree27369ea3bbc025e1c43f282fea96129db7d879d9 /network/common
parent9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff)
downloadspark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures. Author: Aaron Davidson <aaron@databricks.com> Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'network/common')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java3
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java5
2 files changed, 5 insertions, 3 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e7fa4f6bf3..0b4a1d8286 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
private final Class<? extends Channel> socketChannelClass;
- private final EventLoopGroup workerGroup;
+ private EventLoopGroup workerGroup;
public TransportClientFactory(TransportContext context) {
this.context = context;
@@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
+ workerGroup = null;
}
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d1a1877a98..70da48ca8e 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;
+ /** Creates a TransportServer that binds to the given port, or to any available if 0. */
public TransportServer(TransportContext context, int portToBind) {
this.context = context;
this.conf = context.getConf();
@@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+ NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
@Override
public void close() {
if (channelFuture != null) {
- // close is a local operation and should finish with milliseconds; timeout just to be safe
+ // close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}