diff options
author | jerluc <jeremyalucas@gmail.com> | 2015-05-18 18:13:29 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-18 18:13:29 -0700 |
commit | 0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626 (patch) | |
tree | 10a13e9ff68d6dfa1f4d49ee428ccf6b28717383 /external/kafka/src/main/scala | |
parent | 4fb52f9545ae338fae2d3aeea4bfc35d5df44853 (diff) | |
download | spark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.tar.gz spark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.tar.bz2 spark-0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626.zip |
[SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
PR per [SPARK-7621](https://issues.apache.org/jira/browse/SPARK-7621), which makes both `KafkaReceiver` and `ReliableKafkaReceiver` report its errors to the `ReceiverTracker`, which in turn will add the events to the bus to fire off any registered `StreamingListener`s.
Author: jerluc <jeremyalucas@gmail.com>
Closes #6204 from jerluc/master and squashes the following commits:
82439a5 [jerluc] [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners
Diffstat (limited to 'external/kafka/src/main/scala')
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 2 | ||||
-rw-r--r-- | external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala | 2 |
2 files changed, 2 insertions, 2 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index cca0fac023..04b2dc10d3 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -135,7 +135,7 @@ class KafkaReceiver[ store((msgAndMetadata.key, msgAndMetadata.message)) } } catch { - case e: Throwable => logError("Error handling message; exiting", e) + case e: Throwable => reportError("Error handling message; exiting", e) } } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index ea87e96037..75f0dfc22b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -267,7 +267,7 @@ class ReliableKafkaReceiver[ } } catch { case e: Exception => - logError("Error handling message", e) + reportError("Error handling message", e) } } } |