diff options
author | Nick Evans <me@nicolasevans.org> | 2015-10-27 01:29:06 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-10-27 01:29:06 -0700 |
commit | 8f888eea1aef5a28916ec406a99fc19648681ecf (patch) | |
tree | 1d145476595a953aa22a8f1204626c25d1aeb715 /python | |
parent | feb8d6a44fbfc31a880aaaac0cfcaadc91786073 (diff) | |
download | spark-8f888eea1aef5a28916ec406a99fc19648681ecf.tar.gz spark-8f888eea1aef5a28916ec406a99fc19648681ecf.tar.bz2 spark-8f888eea1aef5a28916ec406a99fc19648681ecf.zip |
[SPARK-11270][STREAMING] Add improved equality testing for TopicAndPartition from the Kafka Streaming API
jerryshao tdas
I know this is kind of minor, and I know you all are busy, but this brings this class in line with the `OffsetRange` class, and makes tests a little more concise.
Instead of doing something like:
```
assert topic_and_partition_instance._topic == "foo"
assert topic_and_partition_instance._partition == 0
```
You can do something like:
```
assert topic_and_partition_instance == TopicAndPartition("foo", 0)
```
Before:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
False
```
After:
```
>>> from pyspark.streaming.kafka import TopicAndPartition
>>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0)
True
```
I couldn't find any tests - am I missing something?
Author: Nick Evans <me@nicolasevans.org>
Closes #9236 from manygrams/topic_and_partition_equality.
Diffstat (limited to 'python')
-rw-r--r-- | python/pyspark/streaming/kafka.py | 10 | ||||
-rw-r--r-- | python/pyspark/streaming/tests.py | 10 |
2 files changed, 20 insertions, 0 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index b35bbaf404..06e159172a 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -254,6 +254,16 @@ class TopicAndPartition(object): def _jTopicAndPartition(self, helper): return helper.createTopicAndPartition(self._topic, self._partition) + def __eq__(self, other): + if isinstance(other, self.__class__): + return (self._topic == other._topic + and self._partition == other._partition) + else: + return False + + def __ne__(self, other): + return not self.__eq__(other) + class Broker(object): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 2c908daa8b..f7fa481d50 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -898,6 +898,16 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + def test_topic_and_partition_equality(self): + topic_and_partition_a = TopicAndPartition("foo", 0) + topic_and_partition_b = TopicAndPartition("foo", 0) + topic_and_partition_c = TopicAndPartition("bar", 0) + topic_and_partition_d = TopicAndPartition("foo", 1) + + self.assertEqual(topic_and_partition_a, topic_and_partition_b) + self.assertNotEqual(topic_and_partition_a, topic_and_partition_c) + self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) + class FlumeStreamTests(PySparkStreamingTestCase): timeout = 20 # seconds |