aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-25 09:47:20 -0800
commitc1f85fc71e71e07534b89c84677d977bb20994f8 (patch)
tree906ab2d70f3b9c70ae26222742382702c0ffccae /network
parent0a5aef753e70e93d7e56054f354a52e4d4e18932 (diff)
downloadspark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.gz
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.tar.bz2
spark-c1f85fc71e71e07534b89c84677d977bb20994f8.zip
[SPARK-11956][CORE] Fix a few bugs in network lib-based file transfer.
- NettyRpcEnv::openStream() now correctly propagates errors to the read side of the pipe. - NettyStreamManager now throws if the file being transferred does not exist. - The network library now correctly handles zero-sized streams. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #9941 from vanzin/SPARK-11956.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java28
-rw-r--r--network/common/src/test/java/org/apache/spark/network/StreamSuite.java23
2 files changed, 40 insertions, 11 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
index be181e0660..4c15045363 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java
@@ -185,16 +185,24 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
StreamResponse resp = (StreamResponse) message;
StreamCallback callback = streamCallbacks.poll();
if (callback != null) {
- StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
- callback);
- try {
- TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
- channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
- frameDecoder.setInterceptor(interceptor);
- streamActive = true;
- } catch (Exception e) {
- logger.error("Error installing stream handler.", e);
- deactivateStream();
+ if (resp.byteCount > 0) {
+ StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
+ callback);
+ try {
+ TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
+ channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
+ frameDecoder.setInterceptor(interceptor);
+ streamActive = true;
+ } catch (Exception e) {
+ logger.error("Error installing stream handler.", e);
+ deactivateStream();
+ }
+ } else {
+ try {
+ callback.onComplete(resp.streamId);
+ } catch (Exception e) {
+ logger.warn("Error in stream handler onComplete().", e);
+ }
}
} else {
logger.error("Could not find callback for StreamResponse.");
diff --git a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
index 00158fd081..538f3efe8d 100644
--- a/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/StreamSuite.java
@@ -51,13 +51,14 @@ import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
public class StreamSuite {
- private static final String[] STREAMS = { "largeBuffer", "smallBuffer", "file" };
+ private static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
private static TransportServer server;
private static TransportClientFactory clientFactory;
private static File testFile;
private static File tempDir;
+ private static ByteBuffer emptyBuffer;
private static ByteBuffer smallBuffer;
private static ByteBuffer largeBuffer;
@@ -73,6 +74,7 @@ public class StreamSuite {
@BeforeClass
public static void setUp() throws Exception {
tempDir = Files.createTempDir();
+ emptyBuffer = createBuffer(0);
smallBuffer = createBuffer(100);
largeBuffer = createBuffer(100000);
@@ -103,6 +105,8 @@ public class StreamSuite {
return new NioManagedBuffer(largeBuffer);
case "smallBuffer":
return new NioManagedBuffer(smallBuffer);
+ case "emptyBuffer":
+ return new NioManagedBuffer(emptyBuffer);
case "file":
return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
default:
@@ -139,6 +143,18 @@ public class StreamSuite {
}
@Test
+ public void testZeroLengthStream() throws Throwable {
+ TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+ try {
+ StreamTask task = new StreamTask(client, "emptyBuffer", TimeUnit.SECONDS.toMillis(5));
+ task.run();
+ task.check();
+ } finally {
+ client.close();
+ }
+ }
+
+ @Test
public void testSingleStream() throws Throwable {
TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
try {
@@ -226,6 +242,11 @@ public class StreamSuite {
outFile = File.createTempFile("data", ".tmp", tempDir);
out = new FileOutputStream(outFile);
break;
+ case "emptyBuffer":
+ baos = new ByteArrayOutputStream();
+ out = baos;
+ srcBuffer = emptyBuffer;
+ break;
default:
throw new IllegalArgumentException(streamId);
}