aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-04-27 23:48:02 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-27 23:48:02 -0700
commit9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da (patch)
tree5df0f823975fd6f0ef7132a7346ca993bc30d63b /python/pyspark/streaming/tests.py
parent29576e786072bd4218e10036ddfc8d367b1c1446 (diff)
downloadspark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.gz
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.tar.bz2
spark-9e4e82b7bca1129bcd5e0274b9ae1b1be3fb93da.zip
[SPARK-5946] [STREAMING] Add Python API for direct Kafka stream
Currently only added `createDirectStream` API, I'm not sure if `createRDD` is also needed, since some Java object needs to be wrapped in Python. Please help to review, thanks a lot. Author: jerryshao <saisai.shao@intel.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #4723 from jerryshao/direct-kafka-python-api and squashes the following commits: a1fe97c [jerryshao] Fix rebase issue eebf333 [jerryshao] Address the comments da40f4e [jerryshao] Fix Python 2.6 Syntax error issue 5c0ee85 [jerryshao] Style fix 4aeac18 [jerryshao] Fix bug in example code 7146d86 [jerryshao] Add unit test bf3bdd6 [jerryshao] Add more APIs and address the comments f5b3801 [jerryshao] Small style fix 8641835 [Saisai Shao] Rebase and update the code 589c05b [Saisai Shao] Fix the style d6fcb6a [Saisai Shao] Address the comments dfda902 [Saisai Shao] Style fix 0f7d168 [Saisai Shao] Add the doc and fix some style issues 67e6880 [Saisai Shao] Fix test bug 917b0db [Saisai Shao] Add Python createRDD API for Kakfa direct stream c3fc11d [jerryshao] Modify the docs 2c00936 [Saisai Shao] address the comments 3360f44 [jerryshao] Fix code style e0e0f0d [jerryshao] Code clean and bug fix 338c41f [Saisai Shao] Add python API and example for direct kafka stream
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py84
1 files changed, 77 insertions, 7 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5fa1e5ef08..7c06c20345 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ from itertools import chain
import time
import operator
import tempfile
+import random
import struct
from functools import reduce
@@ -35,7 +36,7 @@ else:
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
-from pyspark.streaming.kafka import KafkaUtils
+from pyspark.streaming.kafka import Broker, KafkaUtils, OffsetRange, TopicAndPartition
class PySparkStreamingTestCase(unittest.TestCase):
@@ -590,9 +591,27 @@ class KafkaStreamTests(PySparkStreamingTestCase):
super(KafkaStreamTests, self).tearDown()
+ def _randomTopic(self):
+ return "topic-%d" % random.randint(0, 10000)
+
+ def _validateStreamResult(self, sendData, stream):
+ result = {}
+ for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
+ sum(sendData.values()))):
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
+ def _validateRddResult(self, sendData, rdd):
+ result = {}
+ for i in rdd.map(lambda x: x[1]).collect():
+ result[i] = result.get(i, 0) + 1
+
+ self.assertEqual(sendData, result)
+
def test_kafka_stream(self):
"""Test the Python Kafka stream API."""
- topic = "topic1"
+ topic = self._randomTopic()
sendData = {"a": 3, "b": 5, "c": 10}
self._kafkaTestUtils.createTopic(topic)
@@ -601,13 +620,64 @@ class KafkaStreamTests(PySparkStreamingTestCase):
stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
"test-streaming-consumer", {topic: 1},
{"auto.offset.reset": "smallest"})
+ self._validateStreamResult(sendData, stream)
- result = {}
- for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
- sum(sendData.values()))):
- result[i] = result.get(i, 0) + 1
+ def test_kafka_direct_stream(self):
+ """Test the Python direct Kafka stream API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
- self.assertEqual(sendData, result)
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_from_offset(self):
+ """Test the Python direct Kafka stream API with start offset specified."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ fromOffsets = {TopicAndPartition(topic, 0): long(0)}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams, fromOffsets)
+ self._validateStreamResult(sendData, stream)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd(self):
+ """Test the Python direct Kafka RDD API."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
+ self._validateRddResult(sendData, rdd)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd_with_leaders(self):
+ """Test the Python direct Kafka RDD API with leaders."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+ address = self._kafkaTestUtils.brokerAddress().split(":")
+ leaders = {TopicAndPartition(topic, 0): Broker(address[0], int(address[1]))}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
+ self._validateRddResult(sendData, rdd)
if __name__ == "__main__":
unittest.main()