aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorDaniel Jalova <djalova@us.ibm.com>2015-11-16 11:29:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-16 11:29:27 -0800
commitace0db47141ffd457c2091751038fc291f6d5a8b (patch)
tree7da3b00e0ef22660fdbbec53ac43e6a6959fe3e6 /python/pyspark/streaming/tests.py
parentde5e531d337075fd849437e88846873bca8685e6 (diff)
downloadspark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.gz
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.tar.bz2
spark-ace0db47141ffd457c2091751038fc291f6d5a8b.zip
[SPARK-6328][PYTHON] Python API for StreamingListener
Author: Daniel Jalova <djalova@us.ibm.com> Closes #9186 from djalova/SPARK-6328.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py126
1 files changed, 125 insertions, 1 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 6ee864d8d3..2983028413 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -48,6 +48,7 @@ from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPar
from pyspark.streaming.flume import FlumeUtils
from pyspark.streaming.mqtt import MQTTUtils
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.listener import StreamingListener
class PySparkStreamingTestCase(unittest.TestCase):
@@ -403,6 +404,128 @@ class BasicOperationTests(PySparkStreamingTestCase):
self._test_func(input, func, expected)
+class StreamingListenerTests(PySparkStreamingTestCase):
+
+ duration = .5
+
+ class BatchInfoCollector(StreamingListener):
+
+ def __init__(self):
+ super(StreamingListener, self).__init__()
+ self.batchInfosCompleted = []
+ self.batchInfosStarted = []
+ self.batchInfosSubmitted = []
+
+ def onBatchSubmitted(self, batchSubmitted):
+ self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
+
+ def onBatchStarted(self, batchStarted):
+ self.batchInfosStarted.append(batchStarted.batchInfo())
+
+ def onBatchCompleted(self, batchCompleted):
+ self.batchInfosCompleted.append(batchCompleted.batchInfo())
+
+ def test_batch_info_reports(self):
+ batch_collector = self.BatchInfoCollector()
+ self.ssc.addStreamingListener(batch_collector)
+ input = [[1], [2], [3], [4]]
+
+ def func(dstream):
+ return dstream.map(int)
+ expected = [[1], [2], [3], [4]]
+ self._test_func(input, func, expected)
+
+ batchInfosSubmitted = batch_collector.batchInfosSubmitted
+ batchInfosStarted = batch_collector.batchInfosStarted
+ batchInfosCompleted = batch_collector.batchInfosCompleted
+
+ self.wait_for(batchInfosCompleted, 4)
+
+ self.assertGreaterEqual(len(batchInfosSubmitted), 4)
+ for info in batchInfosSubmitted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), -1)
+ self.assertGreaterEqual(outputInfo.endTime(), -1)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertEqual(info.schedulingDelay(), -1)
+ self.assertEqual(info.processingDelay(), -1)
+ self.assertEqual(info.totalDelay(), -1)
+ self.assertEqual(info.numRecords(), 0)
+
+ self.assertGreaterEqual(len(batchInfosStarted), 4)
+ for info in batchInfosStarted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), -1)
+ self.assertGreaterEqual(outputInfo.endTime(), -1)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertGreaterEqual(info.schedulingDelay(), 0)
+ self.assertEqual(info.processingDelay(), -1)
+ self.assertEqual(info.totalDelay(), -1)
+ self.assertEqual(info.numRecords(), 0)
+
+ self.assertGreaterEqual(len(batchInfosCompleted), 4)
+ for info in batchInfosCompleted:
+ self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(info.submissionTime(), 0)
+
+ for streamId in info.streamIdToInputInfo():
+ streamInputInfo = info.streamIdToInputInfo()[streamId]
+ self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+ self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+ for key in streamInputInfo.metadata():
+ self.assertIsNotNone(streamInputInfo.metadata()[key])
+ self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+ for outputOpId in info.outputOperationInfos():
+ outputInfo = info.outputOperationInfos()[outputOpId]
+ self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 0)
+ self.assertGreaterEqual(outputInfo.id(), 0)
+ self.assertIsNotNone(outputInfo.name())
+ self.assertIsNotNone(outputInfo.description())
+ self.assertGreaterEqual(outputInfo.startTime(), 0)
+ self.assertGreaterEqual(outputInfo.endTime(), 0)
+ self.assertIsNone(outputInfo.failureReason())
+
+ self.assertGreaterEqual(info.schedulingDelay(), 0)
+ self.assertGreaterEqual(info.processingDelay(), 0)
+ self.assertGreaterEqual(info.totalDelay(), 0)
+ self.assertEqual(info.numRecords(), 0)
+
+
class WindowFunctionTests(PySparkStreamingTestCase):
timeout = 15
@@ -1308,7 +1431,8 @@ if __name__ == "__main__":
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests]
+ KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, MQTTStreamTests,
+ StreamingListenerTests]
if kinesis_jar_present is True:
testcases.append(KinesisStreamTests)