From 2ebd1df3f17993f3cb472ec44c8832213976d99a Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 2 Nov 2014 16:26:24 -0800 Subject: [SPARK-4183] Close transport-related resources between SparkContexts A leak of event loops may be causing test failures. Author: Aaron Davidson Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures. --- .../java/org/apache/spark/network/client/TransportClientFactory.java | 3 ++- .../main/java/org/apache/spark/network/server/TransportServer.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) (limited to 'network/common') diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index e7fa4f6bf3..0b4a1d8286 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable { private final ConcurrentHashMap connectionPool; private final Class socketChannelClass; - private final EventLoopGroup workerGroup; + private EventLoopGroup workerGroup; public TransportClientFactory(TransportContext context) { this.context = context; @@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable { if (workerGroup != null) { workerGroup.shutdownGracefully(); + workerGroup = null; } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index d1a1877a98..70da48ca8e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -49,6 +49,7 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; + /** Creates a TransportServer that binds to the given port, or to any available if 0. */ public TransportServer(TransportContext context, int portToBind) { this.context = context; this.conf = context.getConf(); @@ -67,7 +68,7 @@ public class TransportServer implements Closeable { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = - NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); + NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() @@ -105,7 +106,7 @@ public class TransportServer implements Closeable { @Override public void close() { if (channelFuture != null) { - // close is a local operation and should finish with milliseconds; timeout just to be safe + // close is a local operation and should finish within milliseconds; timeout just to be safe channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); channelFuture = null; } -- cgit v1.2.3