aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Jalova <djalova@us.ibm.com>2015-11-16 11:29:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-16 11:29:27 -0800
commitace0db47141ffd457c2091751038fc291f6d5a8b (patch)
tree7da3b00e0ef22660fdbbec53ac43e6a6959fe3e6
parentde5e531d337075fd849437e88846873bca8685e6 (diff)
downloadspark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.gz
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.bz2
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.zip
[SPARK-6328][PYTHON] Python API for StreamingListener
Author: Daniel Jalova <djalova@us.ibm.com> Closes #9186 from djalova/SPARK-6328.
-rw-r--r--python/pyspark/streaming/__init__.py3
-rw-r--r--python/pyspark/streaming/context.py8
-rw-r--r--python/pyspark/streaming/listener.py75
-rw-r--r--python/pyspark/streaming/tests.py126
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala76
5 files changed, 286 insertions, 2 deletions
diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py
index d2644a1d4f..66e8f8ef00 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -17,5 +17,6 @@
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
-__all__ = ['StreamingContext', 'DStream']
+__all__ = ['StreamingContext', 'DStream', 'StreamingListener']
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 8be56c9915..1388b6d044 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -363,3 +363,11 @@ class StreamingContext(object):
first = dstreams[0]
jrest = [d._jdstream for d in dstreams[1:]]
return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer)
+
+ def addStreamingListener(self, streamingListener):
+ """
+ Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ receiving system events related to streaming.
+ """
+ self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
+ self._jvm.PythonStreamingListenerWrapper(streamingListener)))
diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py
new file mode 100644
index 0000000000..b830797f5c
--- /dev/null
+++ b/python/pyspark/streaming/listener.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StreamingListener"]
+
+
+class StreamingListener(object):
+
+ def __init__(self):
+ pass
+
+ def onReceiverStarted(self, receiverStarted):
+ """
+ Called when a receiver has been started
+ """
+ pass
+
+ def onReceiverError(self, receiverError):
+ """
+ Called when a receiver has reported an error
+ """
+ pass
+
+ def onReceiverStopped(self, receiverStopped):
+ """
+ Called when a receiver has been stopped
+ """
+ pass
+
+ def onBatchSubmitted(self, batchSubmitted):
+ """
+ Called when a batch of jobs has been submitted for processing.
+ """
+ pass
+
+ def onBatchStarted(self, batchStarted):
+ """
+ Called when processing of a batch of jobs has started.
+ """
+ pass
+
+ def onBatchCompleted(self, batchCompleted):
+ """
+ Called when processing of a batch of jobs has completed.
+ """
+ pass
+
+ def onOutputOperationStarted(self, outputOperationStarted):
+ """
+ Called when processing of a job of a batch has started.
+ """
+ pass
+
+ def onOutputOperationCompleted(self, outputOperationCompleted):
+ """
+ Called when processing of a job of a batch has completed
+ """
+ pass
+
+ class Java:
+ implements = ["org.apache.spark.streaming.api.java.PythonStreamingListener"]
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 6ee864d8d3..2983028413 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -48,6 +48,7 @@ from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPar
from pyspark.streaming.flume import FlumeUtils
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.listener import StreamingListener
class PySparkStreamingTestCase(unittest.TestCase):
@@ -403,6 +404,128 @@ class BasicOperationTests(PySparkStreamingTestCase):
self._test_func(input, func, expected)
+class StreamingListenerTests(PySparkStreamingTestCase):
+
+ duration = .5
+
+ class BatchInfoCollector(StreamingListener):
+
+ def __init__(self):
+ super(StreamingListener, self).__init__()
+ self.batchInfosCompleted = []
+ self.batchInfosStarted = []
+ self.batchInfosSubmitted = []
+
+ def onBatchSubmitted(self, batchSubmitted):
+ self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
+
+ def onBatchStarted(self, batchStarted):
+ self.batchInfosStarted.append(batchStarted.batchInfo())
+
+ def onBatchCompleted(self, batchCompleted):
+ self.batchInfosCompleted.append(batchCompleted.batchInfo())
+
+ def test_batch_info_reports(self):
+ batch_collector = self.BatchInfoCollector()
+ self.ssc.addStreamingListener(batch_collector)
+ input = [[1], [2], [3], [4]]
+
+ def func(dstream):
+ return dstream.map(int)
+ expected = [[1], [2], [3], [4]]
+ self._test_func(input, func, expected)
+
+ batchInfosSubmitted = batch_collector.batchInfosSubmitted
+ batchInfosStarted = batch_collector.batchInfosStarted
+ batchInfosCompleted = batch_collector.batchInfosCompleted
+
+ self.wait_for(batchInfosCompleted, 4)
+
+ self.assertGreaterEqual(len(batchInfosSubmitted), 4)
+ for info in batchInfosSubmitted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), -1)
+ self.assertGreaterEqual(outputInfo.endTime(), -1)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertEqual(info.schedulingDelay(), -1)
+ self.assertEqual(info.processingDelay(), -1)
+ self.assertEqual(info.totalDelay(), -1)
+ self.assertEqual(info.numRecords(), 0)
+
+ self.assertGreaterEqual(len(batchInfosStarted), 4)
+ for info in batchInfosStarted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), -1)
+ self.assertGreaterEqual(outputInfo.endTime(), -1)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertGreaterEqual(info.schedulingDelay(), 0)
+ self.assertEqual(info.processingDelay(), -1)
+ self.assertEqual(info.totalDelay(), -1)
+ self.assertEqual(info.numRecords(), 0)
+
+ self.assertGreaterEqual(len(batchInfosCompleted), 4)
+ for info in batchInfosCompleted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), 0)
+ self.assertGreaterEqual(outputInfo.endTime(), 0)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertGreaterEqual(info.schedulingDelay(), 0)
+ self.assertGreaterEqual(info.processingDelay(), 0)
+ self.assertGreaterEqual(info.totalDelay(), 0)
+ self.assertEqual(info.numRecords(), 0)
+
+
class WindowFunctionTests(PySparkStreamingTestCase):
timeout = 15
@@ -1308,7 +1431,8 @@ if __name__ == "__main__":
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
+ StreamingListenerTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index 34429074fe..7bfd6bd5af 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -18,6 +18,82 @@
package org.apache.spark.streaming.api.java
import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.StreamingListener
+
+private[streaming] trait PythonStreamingListener{
+
+ /** Called when a receiver has been started */
+ def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
+
+ /** Called when a receiver has reported an error */
+ def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
+
+ /** Called when a receiver has been stopped */
+ def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) { }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
+
+ /** Called when processing of a batch of jobs has started. */
+ def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
+
+ /** Called when processing of a batch of jobs has completed. */
+ def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
+
+ /** Called when processing of a job of a batch has started. */
+ def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
+
+ /** Called when processing of a job of a batch has completed. */
+ def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) { }
+}
+
+private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
+ extends JavaStreamingListener {
+
+ /** Called when a receiver has been started */
+ override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
+ listener.onReceiverStarted(receiverStarted)
+ }
+
+ /** Called when a receiver has reported an error */
+ override def onReceiverError(receiverError: JavaStreamingListenerReceiverError): Unit = {
+ listener.onReceiverError(receiverError)
+ }
+
+ /** Called when a receiver has been stopped */
+ override def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped): Unit = {
+ listener.onReceiverStopped(receiverStopped)
+ }
+
+ /** Called when a batch of jobs has been submitted for processing. */
+ override def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted): Unit = {
+ listener.onBatchSubmitted(batchSubmitted)
+ }
+
+ /** Called when processing of a batch of jobs has started. */
+ override def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted): Unit = {
+ listener.onBatchStarted(batchStarted)
+ }
+
+ /** Called when processing of a batch of jobs has completed. */
+ override def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted): Unit = {
+ listener.onBatchCompleted(batchCompleted)
+ }
+
+ /** Called when processing of a job of a batch has started. */
+ override def onOutputOperationStarted(
+ outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit = {
+ listener.onOutputOperationStarted(outputOperationStarted)
+ }
+
+ /** Called when processing of a job of a batch has completed. */
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): Unit = {
+ listener.onOutputOperationCompleted(outputOperationCompleted)
+ }
+}
/**
* A listener interface for receiving information about an ongoing streaming computation.