aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2015-07-09 13:54:44 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-09 13:54:44 -0700
commit3ccebf36c5abe04702d4cf223552a94034d980fb (patch)
tree40990a75e75399422a2d34926505d5521db8edbe /python/pyspark/streaming/tests.py
parent1f6b0b1234cc03aa2e07aea7fec2de7563885238 (diff)
downloadspark-3ccebf36c5abe04702d4cf223552a94034d980fb.tar.gz
spark-3ccebf36c5abe04702d4cf223552a94034d980fb.tar.bz2
spark-3ccebf36c5abe04702d4cf223552a94034d980fb.zip
[SPARK-8389] [STREAMING] [PYSPARK] Expose KafkaRDDs offsetRange in Python
This PR propose a simple way to expose OffsetRange in Python code, also the usage of offsetRanges is similar to Scala/Java way, here in Python we could get OffsetRange like: ``` dstream.foreachRDD(lambda r: KafkaUtils.offsetRanges(r)) ``` Reason I didn't follow the way what SPARK-8389 suggested is that: Python Kafka API has one more step to decode the message compared to Scala/Java, Which makes Python API return a transformed RDD/DStream, not directly wrapped so-called JavaKafkaRDD, so it is hard to backtrack to the original RDD to get the offsetRange. Author: jerryshao <saisai.shao@intel.com> Closes #7185 from jerryshao/SPARK-8389 and squashes the following commits: 4c6d320 [jerryshao] Another way to fix subclass deserialization issue e6a8011 [jerryshao] Address the comments fd13937 [jerryshao] Fix serialization bug 7debf1c [jerryshao] bug fix cff3893 [jerryshao] refactor the code according to the comments 2aabf9e [jerryshao] Style fix 848c708 [jerryshao] Add HasOffsetRanges for Python
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py64
1 files changed, 64 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 188c8ff120..4ecae1e4bf 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -678,6 +678,70 @@ class KafkaStreamTests(PySparkStreamingTestCase):
rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges, leaders)
self._validateRddResult(sendData, rdd)
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_rdd_get_offsetRanges(self):
+ """Test Python direct Kafka RDD get OffsetRanges."""
+ topic = self._randomTopic()
+ sendData = {"a": 3, "b": 4, "c": 5}
+ offsetRanges = [OffsetRange(topic, 0, long(0), long(sum(sendData.values())))]
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress()}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+ rdd = KafkaUtils.createRDD(self.sc, kafkaParams, offsetRanges)
+ self.assertEqual(offsetRanges, rdd.offsetRanges())
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_foreach_get_offsetRanges(self):
+ """Test the Python direct Kafka stream foreachRDD get offsetRanges."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+
+ offsetRanges = []
+
+ def getOffsetRanges(_, rdd):
+ for o in rdd.offsetRanges():
+ offsetRanges.append(o)
+
+ stream.foreachRDD(getOffsetRanges)
+ self.ssc.start()
+ self.wait_for(offsetRanges, 1)
+
+ self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_transform_get_offsetRanges(self):
+ """Test the Python direct Kafka stream transform get offsetRanges."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ stream = KafkaUtils.createDirectStream(self.ssc, [topic], kafkaParams)
+
+ offsetRanges = []
+
+ def transformWithOffsetRanges(rdd):
+ for o in rdd.offsetRanges():
+ offsetRanges.append(o)
+ return rdd
+
+ stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count())
+ self.ssc.start()
+ self.wait_for(offsetRanges, 1)
+
+ self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
+
class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds