diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-28 22:39:21 -0700 |
---|---|---|
committer | Patrick Wendell <patrick@databricks.com> | 2015-05-28 22:48:23 -0700 |
commit | 7a52fdf25f8d635ba05796abb0c491454d7869cf (patch) | |
tree | 7af3765837da9a39a508c09b4a7a2923e146911c /streaming | |
parent | 68559423ac2ffc2c9dfcbe95a8efa4868757c4bf (diff) | |
download | spark-7a52fdf25f8d635ba05796abb0c491454d7869cf.tar.gz spark-7a52fdf25f8d635ba05796abb0c491454d7869cf.tar.bz2 spark-7a52fdf25f8d635ba05796abb0c491454d7869cf.zip |
[SPARK-7931] [STREAMING] Do not restart receiver when stopped
Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6483 from tdas/SPARK-7931 and squashes the following commits:
09aeee1 [Tathagata Das] Do not restart receiver when stopped
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala | 11 |
1 files changed, 8 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala index 8b72bcf206..96e0a9c1a8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.dstream +import scala.util.control.NonFatal + import org.apache.spark.streaming.StreamingContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.NextIterator @@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag]( while(!isStopped && iterator.hasNext) { store(iterator.next) } + if (!isStopped()) { + restart("Socket data stream had no more data") + } logInfo("Stopped receiving") - restart("Retrying connecting to " + host + ":" + port) } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) - case t: Throwable => - restart("Error receiving data", t) + case NonFatal(e) => + logWarning("Error receiving data", e) + restart("Error receiving data", e) } finally { if (socket != null) { socket.close() |