From ace0db47141ffd457c2091751038fc291f6d5a8b Mon Sep 17 00:00:00 2001 From: Daniel Jalova Date: Mon, 16 Nov 2015 11:29:27 -0800 Subject: [SPARK-6328][PYTHON] Python API for StreamingListener Author: Daniel Jalova Closes #9186 from djalova/SPARK-6328. --- python/pyspark/streaming/__init__.py | 3 +- python/pyspark/streaming/context.py | 8 +++ python/pyspark/streaming/listener.py | 75 +++++++++++++++++++++ python/pyspark/streaming/tests.py | 126 ++++++++++++++++++++++++++++++++++- 4 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 python/pyspark/streaming/listener.py (limited to 'python') diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index d2644a1d4f..66e8f8ef00 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -17,5 +17,6 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener -__all__ = ['StreamingContext', 'DStream'] +__all__ = ['StreamingContext', 'DStream', 'StreamingListener'] diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 8be56c9915..1388b6d044 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -363,3 +363,11 @@ class StreamingContext(object): first = dstreams[0] jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) + + def addStreamingListener(self, streamingListener): + """ + Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for + receiving system events related to streaming. + """ + self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper( + self._jvm.PythonStreamingListenerWrapper(streamingListener))) diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py new file mode 100644 index 0000000000..b830797f5c --- /dev/null +++ b/python/pyspark/streaming/listener.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["StreamingListener"] + + +class StreamingListener(object): + + def __init__(self): + pass + + def onReceiverStarted(self, receiverStarted): + """ + Called when a receiver has been started + """ + pass + + def onReceiverError(self, receiverError): + """ + Called when a receiver has reported an error + """ + pass + + def onReceiverStopped(self, receiverStopped): + """ + Called when a receiver has been stopped + """ + pass + + def onBatchSubmitted(self, batchSubmitted): + """ + Called when a batch of jobs has been submitted for processing. + """ + pass + + def onBatchStarted(self, batchStarted): + """ + Called when processing of a batch of jobs has started. + """ + pass + + def onBatchCompleted(self, batchCompleted): + """ + Called when processing of a batch of jobs has completed. + """ + pass + + def onOutputOperationStarted(self, outputOperationStarted): + """ + Called when processing of a job of a batch has started. + """ + pass + + def onOutputOperationCompleted(self, outputOperationCompleted): + """ + Called when processing of a job of a batch has completed + """ + pass + + class Java: + implements = ["org.apache.spark.streaming.api.java.PythonStreamingListener"] diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 6ee864d8d3..2983028413 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -48,6 +48,7 @@ from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPar from pyspark.streaming.flume import FlumeUtils from pyspark.streaming.mqtt import MQTTUtils from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream +from pyspark.streaming.listener import StreamingListener class PySparkStreamingTestCase(unittest.TestCase): @@ -403,6 +404,128 @@ class BasicOperationTests(PySparkStreamingTestCase): self._test_func(input, func, expected) +class StreamingListenerTests(PySparkStreamingTestCase): + + duration = .5 + + class BatchInfoCollector(StreamingListener): + + def __init__(self): + super(StreamingListener, self).__init__() + self.batchInfosCompleted = [] + self.batchInfosStarted = [] + self.batchInfosSubmitted = [] + + def onBatchSubmitted(self, batchSubmitted): + self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) + + def onBatchStarted(self, batchStarted): + self.batchInfosStarted.append(batchStarted.batchInfo()) + + def onBatchCompleted(self, batchCompleted): + self.batchInfosCompleted.append(batchCompleted.batchInfo()) + + def test_batch_info_reports(self): + batch_collector = self.BatchInfoCollector() + self.ssc.addStreamingListener(batch_collector) + input = [[1], [2], [3], [4]] + + def func(dstream): + return dstream.map(int) + expected = [[1], [2], [3], [4]] + self._test_func(input, func, expected) + + batchInfosSubmitted = batch_collector.batchInfosSubmitted + batchInfosStarted = batch_collector.batchInfosStarted + batchInfosCompleted = batch_collector.batchInfosCompleted + + self.wait_for(batchInfosCompleted, 4) + + self.assertGreaterEqual(len(batchInfosSubmitted), 4) + for info in batchInfosSubmitted: + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + + for streamId in info.streamIdToInputInfo(): + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) + + for outputOpId in info.outputOperationInfos(): + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), -1) + self.assertGreaterEqual(outputInfo.endTime(), -1) + self.assertIsNone(outputInfo.failureReason()) + + self.assertEqual(info.schedulingDelay(), -1) + self.assertEqual(info.processingDelay(), -1) + self.assertEqual(info.totalDelay(), -1) + self.assertEqual(info.numRecords(), 0) + + self.assertGreaterEqual(len(batchInfosStarted), 4) + for info in batchInfosStarted: + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + + for streamId in info.streamIdToInputInfo(): + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) + + for outputOpId in info.outputOperationInfos(): + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), -1) + self.assertGreaterEqual(outputInfo.endTime(), -1) + self.assertIsNone(outputInfo.failureReason()) + + self.assertGreaterEqual(info.schedulingDelay(), 0) + self.assertEqual(info.processingDelay(), -1) + self.assertEqual(info.totalDelay(), -1) + self.assertEqual(info.numRecords(), 0) + + self.assertGreaterEqual(len(batchInfosCompleted), 4) + for info in batchInfosCompleted: + self.assertGreaterEqual(info.batchTime().milliseconds(), 0) + self.assertGreaterEqual(info.submissionTime(), 0) + + for streamId in info.streamIdToInputInfo(): + streamInputInfo = info.streamIdToInputInfo()[streamId] + self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0) + self.assertGreaterEqual(streamInputInfo.numRecords, 0) + for key in streamInputInfo.metadata(): + self.assertIsNotNone(streamInputInfo.metadata()[key]) + self.assertIsNotNone(streamInputInfo.metadataDescription()) + + for outputOpId in info.outputOperationInfos(): + outputInfo = info.outputOperationInfos()[outputOpId] + self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0) + self.assertGreaterEqual(outputInfo.id(), 0) + self.assertIsNotNone(outputInfo.name()) + self.assertIsNotNone(outputInfo.description()) + self.assertGreaterEqual(outputInfo.startTime(), 0) + self.assertGreaterEqual(outputInfo.endTime(), 0) + self.assertIsNone(outputInfo.failureReason()) + + self.assertGreaterEqual(info.schedulingDelay(), 0) + self.assertGreaterEqual(info.processingDelay(), 0) + self.assertGreaterEqual(info.totalDelay(), 0) + self.assertEqual(info.numRecords(), 0) + + class WindowFunctionTests(PySparkStreamingTestCase): timeout = 15 @@ -1308,7 +1431,8 @@ if __name__ == "__main__": os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, - KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests] + KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests, + StreamingListenerTests] if kinesis_jar_present is True: testcases.append(KinesisStreamTests) -- cgit v1.2.3