From 3ccebf36c5abe04702d4cf223552a94034d980fb Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 9 Jul 2015 13:54:44 -0700 Subject: [SPARK-8389] [STREAMING] [PYSPARK] Expose KafkaRDDs offsetRange in Python This PR propose a simple way to expose OffsetRange in Python code, also the usage of offsetRanges is similar to Scala/Java way, here in Python we could get OffsetRange like: ``` dstream.foreachRDD(lambda r: KafkaUtils.offsetRanges(r)) ``` Reason I didn't follow the way what SPARK-8389 suggested is that: Python Kafka API has one more step to decode the message compared to Scala/Java, Which makes Python API return a transformed RDD/DStream, not directly wrapped so-called JavaKafkaRDD, so it is hard to backtrack to the original RDD to get the offsetRange. Author: jerryshao Closes #7185 from jerryshao/SPARK-8389 and squashes the following commits: 4c6d320 [jerryshao] Another way to fix subclass deserialization issue e6a8011 [jerryshao] Address the comments fd13937 [jerryshao] Fix serialization bug 7debf1c [jerryshao] bug fix cff3893 [jerryshao] refactor the code according to the comments 2aabf9e [jerryshao] Style fix 848c708 [jerryshao] Add HasOffsetRanges for Python --- python/pyspark/streaming/tests.py | 64 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) (limited to 'python/pyspark/streaming/tests.py') diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 188c8ff120..4ecae1e4bf 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -678,6 +678,70 @@ class KafkaStreamTests(PySparkStreamingTestCase): rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders) self._validateRddResult(sendData, rdd) + @unittest.skipIf(sys.version >= "3", "long type not support") + def test_kafka_rdd_get_offsetRanges(self): + """Test Python direct Kafka RDD get OffsetRanges.""" + topic = self._randomTopic() + sendData = {"a": 3, "b": 4, "c": 5} + offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))] + kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()} + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, sendData) + rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges) + self.assertEqual(offsetRanges, rdd.offsetRanges()) + + @unittest.skipIf(sys.version >= "3", "long type not support") + def test_kafka_direct_stream_foreach_get_offsetRanges(self): + """Test the Python direct Kafka stream foreachRDD get offsetRanges.""" + topic = self._randomTopic() + sendData = {"a": 1, "b": 2, "c": 3} + kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), + "auto.offset.reset": "smallest"} + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, sendData) + + stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) + + offsetRanges = [] + + def getOffsetRanges(_, rdd): + for o in rdd.offsetRanges(): + offsetRanges.append(o) + + stream.foreachRDD(getOffsetRanges) + self.ssc.start() + self.wait_for(offsetRanges, 1) + + self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + + @unittest.skipIf(sys.version >= "3", "long type not support") + def test_kafka_direct_stream_transform_get_offsetRanges(self): + """Test the Python direct Kafka stream transform get offsetRanges.""" + topic = self._randomTopic() + sendData = {"a": 1, "b": 2, "c": 3} + kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), + "auto.offset.reset": "smallest"} + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, sendData) + + stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams) + + offsetRanges = [] + + def transformWithOffsetRanges(rdd): + for o in rdd.offsetRanges(): + offsetRanges.append(o) + return rdd + + stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count()) + self.ssc.start() + self.wait_for(offsetRanges, 1) + + self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + class FlumeStreamTests(PySparkStreamingTestCase): timeout = 20 # seconds -- cgit v1.2.3