aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-17 16:57:52 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-11-17 16:57:52 -0800
commit75a292291062783129d02607302f91c85655975e (patch)
tree7ba09e47b8aa3b3810bc3d115817a862a729aac0 /python/pyspark/streaming/tests.py
parentb362d50fca30693f97bd859984157bb8a76d48a1 (diff)
downloadspark-75a292291062783129d02607302f91c85655975e.tar.gz
spark-75a292291062783129d02607302f91c85655975e.tar.bz2
spark-75a292291062783129d02607302f91c85655975e.zip
[SPARK-9065][STREAMING][PYSPARK] Add MessageHandler for Kafka Python API
Fixed the merge conflicts in #7410 Closes #7410 Author: Shixiong Zhu <shixiong@databricks.com> Author: jerryshao <saisai.shao@intel.com> Author: jerryshao <sshao@hortonworks.com> Closes #9742 from zsxwing/pr7410.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py35
1 files changed, 35 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index ff95639146..0bcd1f1553 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1042,6 +1042,41 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd_message_handler(self):
+ """Test Python direct Kafka RDD MessageHandler."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 1, "c": 2}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ def getKeyAndDoubleMessage(m):
+ return m and (m.key, m.message * 2)
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges,
+ messageHandler=getKeyAndDoubleMessage)
+ self._validateRddResult({"aa": 1, "bb": 1, "cc": 2}, rdd)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_message_handler(self):
+ """Test the Python direct Kafka stream MessageHandler."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ def getKeyAndDoubleMessage(m):
+ return m and (m.key, m.message * 2)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams,
+ messageHandler=getKeyAndDoubleMessage)
+ self._validateStreamResult({"aa": 1, "bb": 2, "cc": 3}, stream)
+
class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds