aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java20
1 files changed, 13 insertions, 7 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
index ec2bffd6a5..7a8ef9d147 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaReceiverAPISuite.java
@@ -23,6 +23,7 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import static org.junit.Assert.*;
+import com.google.common.io.Closeables;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -121,14 +122,19 @@ public class JavaReceiverAPISuite implements Serializable {
private void receive() {
try {
- Socket socket = new Socket(host, port);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- String userInput;
- while ((userInput = in.readLine()) != null) {
- store(userInput);
+ Socket socket = null;
+ BufferedReader in = null;
+ try {
+ socket = new Socket(host, port);
+ in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ String userInput;
+ while ((userInput = in.readLine()) != null) {
+ store(userInput);
+ }
+ } finally {
+ Closeables.close(in, /* swallowIOException = */ true);
+ Closeables.close(socket, /* swallowIOException = */ true);
}
- in.close();
- socket.close();
} catch(ConnectException ce) {
ce.printStackTrace();
restart("Could not connect", ce);