From 0a7a94eab5fba3d2f2ef14a70c2c1bf4ee21b626 Mon Sep 17 00:00:00 2001 From: jerluc Date: Mon, 18 May 2015 18:13:29 -0700 Subject: [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners PR per [SPARK-7621](https://issues.apache.org/jira/browse/SPARK-7621), which makes both `KafkaReceiver` and `ReliableKafkaReceiver` report its errors to the `ReceiverTracker`, which in turn will add the events to the bus to fire off any registered `StreamingListener`s. Author: jerluc Closes #6204 from jerluc/master and squashes the following commits: 82439a5 [jerluc] [SPARK-7621] [STREAMING] Report Kafka errors to StreamingListeners --- .../main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala | 2 +- .../scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'external') diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index cca0fac023..04b2dc10d3 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -135,7 +135,7 @@ class KafkaReceiver[ store((msgAndMetadata.key, msgAndMetadata.message)) } } catch { - case e: Throwable => logError("Error handling message; exiting", e) + case e: Throwable => reportError("Error handling message; exiting", e) } } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index ea87e96037..75f0dfc22b 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -267,7 +267,7 @@ class ReliableKafkaReceiver[ } } catch { case e: Exception => - logError("Error handling message", e) + reportError("Error handling message", e) } } } -- cgit v1.2.3