aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala11
1 files changed, 8 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 8b72bcf206..96e0a9c1a8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.dstream
+import scala.util.control.NonFatal
+
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
+ if (!isStopped()) {
+ restart("Socket data stream had no more data")
+ }
logInfo("Stopped receiving")
- restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
- case t: Throwable =>
- restart("Error receiving data", t)
+ case NonFatal(e) =>
+ logWarning("Error receiving data", e)
+ restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()