aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala76
1 files changed, 76 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index 34429074fe..7bfd6bd5af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -18,6 +18,82 @@
package org.apache.spark.streaming.api.java
import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.StreamingListener
+
+private[streaming] trait PythonStreamingListener{
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
+
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has started. */
+ def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
+
+ /** Called when processing of a job of a batch has started. */
+ def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
+
+ /** Called when processing of a job of a batch has completed. */
+ def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
+}
+
+private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
+ extends JavaStreamingListener {
+
+ /** Called when a receiver has been started */
+ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+ listener.onReceiverStarted(receiverStarted)
+ }
+
+ /** Called when a receiver has reported an error */
+ override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+ listener.onReceiverError(receiverError)
+ }
+
+ /** Called when a receiver has been stopped */
+ override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+ listener.onReceiverStopped(receiverStopped)
+ }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+ listener.onBatchSubmitted(batchSubmitted)
+ }
+
+ /** Called when processing of a batch of jobs has started. */
+ override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+ listener.onBatchStarted(batchStarted)
+ }
+
+ /** Called when processing of a batch of jobs has completed. */
+ override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+ listener.onBatchCompleted(batchCompleted)
+ }
+
+ /** Called when processing of a job of a batch has started. */
+ override def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+ listener.onOutputOperationStarted(outputOperationStarted)
+ }
+
+ /** Called when processing of a job of a batch has completed. */
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+ listener.onOutputOperationCompleted(outputOperationCompleted)
+ }
+}
/**
* A listener interface for receiving information about an ongoing streaming computation.