aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala14
1 files changed, 14 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index db0bae9958..28cb86c9f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -21,6 +21,9 @@ import org.apache.spark.streaming.Time
private[streaming] trait PythonStreamingListener{
+ /** Called when the streaming has been started */
+ def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted) { }
+
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
@@ -51,6 +54,11 @@ private[streaming] trait PythonStreamingListener{
private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
extends JavaStreamingListener {
+ /** Called when the streaming has been started */
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ listener.onStreamingStarted(streamingStarted)
+ }
+
/** Called when a receiver has been started */
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
listener.onReceiverStarted(receiverStarted)
@@ -99,6 +107,9 @@ private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamin
*/
private[streaming] class JavaStreamingListener {
+ /** Called when the streaming has been started */
+ def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { }
+
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { }
@@ -131,6 +142,9 @@ private[streaming] class JavaStreamingListener {
*/
private[streaming] sealed trait JavaStreamingListenerEvent
+private[streaming] class JavaStreamingListenerStreamingStarted(val time: Long)
+ extends JavaStreamingListenerEvent
+
private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo)
extends JavaStreamingListenerEvent