aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-11-02 16:26:24 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-11-02 16:26:24 -0800
commit2ebd1df3f17993f3cb472ec44c8832213976d99a (patch)
tree27369ea3bbc025e1c43f282fea96129db7d879d9 /network
parent9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 (diff)
downloadspark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.gz
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.tar.bz2
spark-2ebd1df3f17993f3cb472ec44c8832213976d99a.zip
[SPARK-4183] Close transport-related resources between SparkContexts
A leak of event loops may be causing test failures. Author: Aaron Davidson <aaron@databricks.com> Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java3
-rw-r--r--network/common/src/main/java/org/apache/spark/network/server/TransportServer.java5
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java7
-rw-r--r--network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java4
4 files changed, 15 insertions, 4 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index e7fa4f6bf3..0b4a1d8286 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
private final Class<? extends Channel> socketChannelClass;
- private final EventLoopGroup workerGroup;
+ private EventLoopGroup workerGroup;
public TransportClientFactory(TransportContext context) {
this.context = context;
@@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
+ workerGroup = null;
}
}
diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index d1a1877a98..70da48ca8e 100644
--- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture;
private int port = -1;
+ /** Creates a TransportServer that binds to the given port, or to any available if 0. */
public TransportServer(TransportContext context, int portToBind) {
this.context = context;
this.conf = context.getConf();
@@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup =
- NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
+ NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
bootstrap = new ServerBootstrap()
@@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
@Override
public void close() {
if (channelFuture != null) {
- // close is a local operation and should finish with milliseconds; timeout just to be safe
+ // close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index cc2f6261ca..6bbabc44b9 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -17,6 +17,8 @@
package org.apache.spark.network.shuffle;
+import java.io.Closeable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient {
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
}
+
+ @Override
+ public void close() {
+ clientFactory.close();
+ }
}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
index 9fa87c2c6e..d46a562394 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
@@ -17,8 +17,10 @@
package org.apache.spark.network.shuffle;
+import java.io.Closeable;
+
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
-public interface ShuffleClient {
+public interface ShuffleClient extends Closeable {
/**
* Fetch a sequence of blocks from a remote node asynchronously,
*