aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDaniel Jalova <djalova@us.ibm.com>2015-11-16 11:29:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-16 11:29:27 -0800
commitace0db47141ffd457c2091751038fc291f6d5a8b (patch)
tree7da3b00e0ef22660fdbbec53ac43e6a6959fe3e6 /streaming
parentde5e531d337075fd849437e88846873bca8685e6 (diff)
downloadspark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.gz
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.bz2
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.zip
[SPARK-6328][PYTHON] Python API for StreamingListener
Author: Daniel Jalova <djalova@us.ibm.com> Closes #9186 from djalova/SPARK-6328.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala76
1 files changed, 76 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index 34429074fe..7bfd6bd5af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -18,6 +18,82 @@
package org.apache.spark.streaming.api.java
import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.StreamingListener
+
+private[streaming] trait PythonStreamingListener{
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
+
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has started. */
+ def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
+
+ /** Called when processing of a job of a batch has started. */
+ def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
+
+ /** Called when processing of a job of a batch has completed. */
+ def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
+}
+
+private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
+ extends JavaStreamingListener {
+
+ /** Called when a receiver has been started */
+ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+ listener.onReceiverStarted(receiverStarted)
+ }
+
+ /** Called when a receiver has reported an error */
+ override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+ listener.onReceiverError(receiverError)
+ }
+
+ /** Called when a receiver has been stopped */
+ override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+ listener.onReceiverStopped(receiverStopped)
+ }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+ listener.onBatchSubmitted(batchSubmitted)
+ }
+
+ /** Called when processing of a batch of jobs has started. */
+ override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+ listener.onBatchStarted(batchStarted)
+ }
+
+ /** Called when processing of a batch of jobs has completed. */
+ override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+ listener.onBatchCompleted(batchCompleted)
+ }
+
+ /** Called when processing of a job of a batch has started. */
+ override def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+ listener.onOutputOperationStarted(outputOperationStarted)
+ }
+
+ /** Called when processing of a job of a batch has completed. */
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+ listener.onOutputOperationCompleted(outputOperationCompleted)
+ }
+}
/**
* A listener interface for receiving information about an ongoing streaming computation.