aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorguoxu1231 <guoxu1231@gmail.com>2016-01-04 14:23:07 +0000
committerSean Owen <sowen@cloudera.com>2016-01-04 14:23:07 +0000
commit962aac4db99f3988c07ccb23439327c18ec178f1 (patch)
treea764bd15132a55e4d95a7ba0a390e2883bf88498 /streaming
parent9fd7a2f0247ed6cea0e8dbcdd2b24f41200b3e24 (diff)
downloadspark-962aac4db99f3988c07ccb23439327c18ec178f1.tar.gz
spark-962aac4db99f3988c07ccb23439327c18ec178f1.tar.bz2
spark-962aac4db99f3988c07ccb23439327c18ec178f1.zip
[SPARK-12513][STREAMING] SocketReceiver hang in Netcat example
Explicitly close client side socket connection before restart socket receiver. Author: guoxu1231 <guoxu1231@gmail.com> Author: Shawn Guo <guoxu1231@gmail.com> Closes #10464 from guoxu1231/SPARK-12513.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala38
1 files changed, 24 insertions, 14 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 10644b9201..e70fc87c39 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming.dstream
import java.io._
-import java.net.{Socket, UnknownHostException}
+import java.net.{ConnectException, Socket}
import scala.reflect.ClassTag
import scala.util.control.NonFatal
@@ -51,7 +51,20 @@ class SocketReceiver[T: ClassTag](
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
+ private var socket: Socket = _
+
def onStart() {
+
+ logInfo(s"Connecting to $host:$port")
+ try {
+ socket = new Socket(host, port)
+ } catch {
+ case e: ConnectException =>
+ restart(s"Error connecting to $host:$port", e)
+ return
+ }
+ logInfo(s"Connected to $host:$port")
+
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
@@ -60,20 +73,22 @@ class SocketReceiver[T: ClassTag](
}
def onStop() {
- // There is nothing much to do as the thread calling receive()
- // is designed to stop by itself isStopped() returns false
+ // in case restart thread close it twice
+ synchronized {
+ if (socket != null) {
+ socket.close()
+ socket = null
+ logInfo(s"Closed socket to $host:$port")
+ }
+ }
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
- var socket: Socket = null
try {
- logInfo("Connecting to " + host + ":" + port)
- socket = new Socket(host, port)
- logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
- store(iterator.next)
+ store(iterator.next())
}
if (!isStopped()) {
restart("Socket data stream had no more data")
@@ -81,16 +96,11 @@ class SocketReceiver[T: ClassTag](
logInfo("Stopped receiving")
}
} catch {
- case e: java.net.ConnectException =>
- restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
- if (socket != null) {
- socket.close()
- logInfo("Closed socket to " + host + ":" + port)
- }
+ onStop()
}
}
}