diff options
author | jerryshao <sshao@hortonworks.com> | 2015-12-01 15:26:10 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2015-12-01 15:26:10 -0800 |
commit | f292018f8e57779debc04998456ec875f628133b (patch) | |
tree | 90e04eb8aaa5e708d7f21192588a7af43d36aff8 /python/pyspark/streaming/tests.py | |
parent | e76431f886ae8061545b3216e8e2fb38c4db1f43 (diff) | |
download | spark-f292018f8e57779debc04998456ec875f628133b.tar.gz spark-f292018f8e57779debc04998456ec875f628133b.tar.bz2 spark-f292018f8e57779debc04998456ec875f628133b.zip |
[SPARK-12002][STREAMING][PYSPARK] Fix python direct stream checkpoint recovery issue
Fixed a minor race condition in #10017
Closes #10017
Author: jerryshao <sshao@hortonworks.com>
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10074 from zsxwing/review-pr10017.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r-- | python/pyspark/streaming/tests.py | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a647e6bf39..d50c6b8d4a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1150,6 +1150,55 @@ class KafkaStreamTests(PySparkStreamingTestCase): self.assertNotEqual(topic_and_partition_a, topic_and_partition_d) @unittest.skipIf(sys.version >= "3", "long type not support") + def test_kafka_direct_stream_transform_with_checkpoint(self): + """Test the Python direct Kafka stream transform with checkpoint correctly recovered.""" + topic = self._randomTopic() + sendData = {"a": 1, "b": 2, "c": 3} + kafkaParams = {"metadata.broker.list": self._kafkaTestUtils.brokerAddress(), + "auto.offset.reset": "smallest"} + + self._kafkaTestUtils.createTopic(topic) + self._kafkaTestUtils.sendMessages(topic, sendData) + + offsetRanges = [] + + def transformWithOffsetRanges(rdd): + for o in rdd.offsetRanges(): + offsetRanges.append(o) + return rdd + + self.ssc.stop(False) + self.ssc = None + tmpdir = "checkpoint-test-%d" % random.randint(0, 10000) + + def setup(): + ssc = StreamingContext(self.sc, 0.5) + ssc.checkpoint(tmpdir) + stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) + stream.transform(transformWithOffsetRanges).count().pprint() + return ssc + + try: + ssc1 = StreamingContext.getOrCreate(tmpdir, setup) + ssc1.start() + self.wait_for(offsetRanges, 1) + self.assertEqual(offsetRanges, [OffsetRange(topic, 0, long(0), long(6))]) + + # To make sure some checkpoint is written + time.sleep(3) + ssc1.stop(False) + ssc1 = None + + # Restart again to make sure the checkpoint is recovered correctly + ssc2 = StreamingContext.getOrCreate(tmpdir, setup) + ssc2.start() + ssc2.awaitTermination(3) + ssc2.stop(stopSparkContext=False, stopGraceFully=True) + ssc2 = None + finally: + shutil.rmtree(tmpdir) + + @unittest.skipIf(sys.version >= "3", "long type not support") def test_kafka_rdd_message_handler(self): """Test Python direct Kafka RDD MessageHandler.""" topic = self._randomTopic() |