aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Evans <me@nicolasevans.org>2015-10-27 01:29:06 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 01:29:06 -0700
commit8f888eea1aef5a28916ec406a99fc19648681ecf (patch)
tree1d145476595a953aa22a8f1204626c25d1aeb715
parentfeb8d6a44fbfc31a880aaaac0cfcaadc91786073 (diff)
downloadspark-8f888eea1aef5a28916ec406a99fc19648681ecf.tar.gz
spark-8f888eea1aef5a28916ec406a99fc19648681ecf.tar.bz2
spark-8f888eea1aef5a28916ec406a99fc19648681ecf.zip
[SPARK-11270][STREAMING] Add improved equality testing for TopicAndPartition from the Kafka Streaming API
jerryshao tdas I know this is kind of minor, and I know you all are busy, but this brings this class in line with the `OffsetRange` class, and makes tests a little more concise. Instead of doing something like: ``` assert topic_and_partition_instance._topic == "foo" assert topic_and_partition_instance._partition == 0 ``` You can do something like: ``` assert topic_and_partition_instance == TopicAndPartition("foo", 0) ``` Before: ``` >>> from pyspark.streaming.kafka import TopicAndPartition >>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0) False ``` After: ``` >>> from pyspark.streaming.kafka import TopicAndPartition >>> TopicAndPartition("foo", 0) == TopicAndPartition("foo", 0) True ``` I couldn't find any tests - am I missing something? Author: Nick Evans <me@nicolasevans.org> Closes #9236 from manygrams/topic_and_partition_equality.
-rw-r--r--python/pyspark/streaming/kafka.py10
-rw-r--r--python/pyspark/streaming/tests.py10
2 files changed, 20 insertions, 0 deletions
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index b35bbaf404..06e159172a 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -254,6 +254,16 @@ class TopicAndPartition(object):
def _jTopicAndPartition(self, helper):
return helper.createTopicAndPartition(self._topic, self._partition)
+ def __eq__(self, other):
+ if isinstance(other, self.__class__):
+ return (self._topic == other._topic
+ and self._partition == other._partition)
+ else:
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
class Broker(object):
"""
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 2c908daa8b..f7fa481d50 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -898,6 +898,16 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
+ def test_topic_and_partition_equality(self):
+ topic_and_partition_a = TopicAndPartition("foo", 0)
+ topic_and_partition_b = TopicAndPartition("foo", 0)
+ topic_and_partition_c = TopicAndPartition("bar", 0)
+ topic_and_partition_d = TopicAndPartition("foo", 1)
+
+ self.assertEqual(topic_and_partition_a, topic_and_partition_b)
+ self.assertNotEqual(topic_and_partition_a, topic_and_partition_c)
+ self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
+
class FlumeStreamTests(PySparkStreamingTestCase):
timeout = 20 # seconds