aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-28 22:39:21 -0700
committerPatrick Wendell <patrick@databricks.com>2015-05-28 22:39:25 -0700
commite714ecf277a7412ea8263662977fe3ad1f794975 (patch)
tree678b070147805582add216c7ee4171a3d3ac2f8a /streaming
parentdb9513789756da4f16bb1fe8cf1d19500f231f54 (diff)
downloadspark-e714ecf277a7412ea8263662977fe3ad1f794975.tar.gz
spark-e714ecf277a7412ea8263662977fe3ad1f794975.tar.bz2
spark-e714ecf277a7412ea8263662977fe3ad1f794975.zip
[SPARK-7931] [STREAMING] Do not restart receiver when stopped
Attempts to restart the socket receiver when it is supposed to be stopped causes undesirable error messages. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6483 from tdas/SPARK-7931 and squashes the following commits: 09aeee1 [Tathagata Das] Do not restart receiver when stopped
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala11
1 files changed, 8 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 8b72bcf206..96e0a9c1a8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.dstream
+import scala.util.control.NonFatal
+
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.NextIterator
@@ -74,13 +76,16 @@ class SocketReceiver[T: ClassTag](
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
+ if (!isStopped()) {
+ restart("Socket data stream had no more data")
+ }
logInfo("Stopped receiving")
- restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
- case t: Throwable =>
- restart("Error receiving data", t)
+ case NonFatal(e) =>
+ logWarning("Error receiving data", e)
+ restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()