aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py84
1 files changed, 77 insertions, 7 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5fa1e5ef08..7c06c20345 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ from itertools import chain
import time
import operator
import tempfile
+import random
import struct
from functools import reduce
@@ -35,7 +36,7 @@ else:
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
+from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
class PySparkStreamingTestCase(unittest.TestCase):
@@ -590,9 +591,27 @@ class KafkaStreamTests(PySparkStreamingTestCase):
super(KafkaStreamTests, self).tearDown()
+ def _randomTopic(self):
+ return "topic-%d" % random.randint(0, 10000)
+
+ def _validateStreamResult(self, sendData, stream):
+ result = {}
+ for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
+ sum(sendData.values()))):
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
+ def _validateRddResult(self, sendData, rdd):
+ result = {}
+ for i in rdd.map(lambda x: x[1]).collect():
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
- topic = "topic1"
+ topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
@@ -601,13 +620,64 @@ class KafkaStreamTests(PySparkStreamingTestCase):
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
+ self._validateStreamResult(sendData, stream)
- result = {}
- for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
- sum(sendData.values()))):
- result[i] = result.get(i, 0) + 1
+ def test_kafka_direct_stream(self):
+ """Test the Python direct Kafka stream API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
- self.assertEqual(sendData, result)
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_from_offset(self):
+ """Test the Python direct Kafka stream API with start offset specified."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ fromOffsets = {TopicAndPartition(topic, 0): long(0)}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd(self):
+ """Test the Python direct Kafka RDD API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
+ self._validateRddResult(sendData, rdd)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd_with_leaders(self):
+ """Test the Python direct Kafka RDD API with leaders."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+ address = self._kafkaTestUtils.brokerAddress().split(":")
+ leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
+ self._validateRddResult(sendData, rdd)
if __name__ == "__main__":
unittest.main()