aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-12-01 15:26:10 -0800
committerShixiong Zhu <shixiong@databricks.com>2015-12-01 15:26:10 -0800
commitf292018f8e57779debc04998456ec875f628133b (patch)
tree90e04eb8aaa5e708d7f21192588a7af43d36aff8 /python/pyspark/streaming/tests.py
parente76431f886ae8061545b3216e8e2fb38c4db1f43 (diff)
downloadspark-f292018f8e57779debc04998456ec875f628133b.tar.gz
spark-f292018f8e57779debc04998456ec875f628133b.tar.bz2
spark-f292018f8e57779debc04998456ec875f628133b.zip
[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue
Fixed a minor race condition in #10017 Closes #10017 Author: jerryshao <sshao@hortonworks.com> Author: Shixiong Zhu <shixiong@databricks.com> Closes #10074 from zsxwing/review-pr10017.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py49
1 files changed, 49 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a647e6bf39..d50c6b8d4a 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase):
self.assertNotEqual(topic_and_partition_a, topic_and_partition_d)
@unittest.skipIf(sys.version >= "3", "long type not support")
+ def test_kafka_direct_stream_transform_with_checkpoint(self):
+ """Test the Python direct Kafka stream transform with checkpoint correctly recovered."""
+ topic = self._randomTopic()
+ sendData = {"a": 1, "b": 2, "c": 3}
+ kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(),
+ "auto.offset.reset": "smallest"}
+
+ self._kafkaTestUtils.createTopic(topic)
+ self._kafkaTestUtils.sendMessages(topic, sendData)
+
+ offsetRanges = []
+
+ def transformWithOffsetRanges(rdd):
+ for o in rdd.offsetRanges():
+ offsetRanges.append(o)
+ return rdd
+
+ self.ssc.stop(False)
+ self.ssc = None
+ tmpdir = "checkpoint-test-%d" % random.randint(0, 10000)
+
+ def setup():
+ ssc = StreamingContext(self.sc, 0.5)
+ ssc.checkpoint(tmpdir)
+ stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
+ stream.transform(transformWithOffsetRanges).count().pprint()
+ return ssc
+
+ try:
+ ssc1 = StreamingContext.getOrCreate(tmpdir, setup)
+ ssc1.start()
+ self.wait_for(offsetRanges, 1)
+ self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))])
+
+ # To make sure some checkpoint is written
+ time.sleep(3)
+ ssc1.stop(False)
+ ssc1 = None
+
+ # Restart again to make sure the checkpoint is recovered correctly
+ ssc2 = StreamingContext.getOrCreate(tmpdir, setup)
+ ssc2.start()
+ ssc2.awaitTermination(3)
+ ssc2.stop(stopSparkContext=False, stopGraceFully=True)
+ ssc2 = None
+ finally:
+ shutil.rmtree(tmpdir)
+
+ @unittest.skipIf(sys.version >= "3", "long type not support")
def test_kafka_rdd_message_handler(self):
"""Test Python direct Kafka RDD MessageHandler."""
topic = self._randomTopic()