diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-03-17 11:12:23 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-17 11:12:23 -0700 |
commit | 376d782164437573880f0ad58cecae1cb5f212f2 (patch) | |
tree | 2dc457fd1bc476742430b6c6f357b0f9253a2e13 | |
parent | 7b5d873aef672aa0aee41e338bab7428101e1ad3 (diff) | |
download | spark-376d782164437573880f0ad58cecae1cb5f212f2.tar.gz spark-376d782164437573880f0ad58cecae1cb5f212f2.tar.bz2 spark-376d782164437573880f0ad58cecae1cb5f212f2.zip |
[SPARK-19986][TESTS] Make pyspark.streaming.tests.CheckpointTests more stable
## What changes were proposed in this pull request?
Sometimes, CheckpointTests will hang on a busy machine because the streaming jobs are too slow and cannot catch up. I observed the scheduled delay was keeping increasing for dozens of seconds locally.
This PR increases the batch interval from 0.5 seconds to 2 seconds to generate less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the streaming job fails, it will also fail the test.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #17323 from zsxwing/SPARK-19986.
-rw-r--r-- | python/pyspark/streaming/tests.py | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 2e8ed69827..1bec335095 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -903,11 +903,11 @@ class CheckpointTests(unittest.TestCase): def setup(): conf = SparkConf().set("spark.default.parallelism", 1) sc = SparkContext(conf=conf) - ssc = StreamingContext(sc, 0.5) + ssc = StreamingContext(sc, 2) dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1)) wc = dstream.updateStateByKey(updater) wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test") - wc.checkpoint(.5) + wc.checkpoint(2) self.setupCalled = True return ssc @@ -921,21 +921,22 @@ class CheckpointTests(unittest.TestCase): def check_output(n): while not os.listdir(outputd): - time.sleep(0.01) + if self.ssc.awaitTerminationOrTimeout(0.5): + raise Exception("ssc stopped") time.sleep(1) # make sure mtime is larger than the previous one with open(os.path.join(inputd, str(n)), 'w') as f: f.writelines(["%d\n" % i for i in range(10)]) while True: + if self.ssc.awaitTerminationOrTimeout(0.5): + raise Exception("ssc stopped") p = os.path.join(outputd, max(os.listdir(outputd))) if '_SUCCESS' not in os.listdir(p): # not finished - time.sleep(0.01) continue ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(",")) d = ordd.values().map(int).collect() if not d: - time.sleep(0.01) continue self.assertEqual(10, len(d)) s = set(d) |