aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-03-17 11:12:23 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2017-03-17 11:12:23 -0700
commit376d782164437573880f0ad58cecae1cb5f212f2 (patch)
tree2dc457fd1bc476742430b6c6f357b0f9253a2e13 /python/pyspark
parent7b5d873aef672aa0aee41e338bab7428101e1ad3 (diff)
downloadspark-376d782164437573880f0ad58cecae1cb5f212f2.tar.gz
spark-376d782164437573880f0ad58cecae1cb5f212f2.tar.bz2
spark-376d782164437573880f0ad58cecae1cb5f212f2.zip
[SPARK-19986][TESTS] Make pyspark.streaming.tests.CheckpointTests more stable
## What changes were proposed in this pull request? Sometimes, CheckpointTests will hang on a busy machine because the streaming jobs are too slow and cannot catch up. I observed the scheduled delay was keeping increasing for dozens of seconds locally. This PR increases the batch interval from 0.5 seconds to 2 seconds to generate less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the streaming job fails, it will also fail the test. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #17323 from zsxwing/SPARK-19986.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/streaming/tests.py11
1 files changed, 6 insertions, 5 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 2e8ed69827..1bec335095 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -903,11 +903,11 @@ class CheckpointTests(unittest.TestCase):
def setup():
conf = SparkConf().set("spark.default.parallelism", 1)
sc = SparkContext(conf=conf)
- ssc = StreamingContext(sc, 0.5)
+ ssc = StreamingContext(sc, 2)
dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
wc = dstream.updateStateByKey(updater)
wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
- wc.checkpoint(.5)
+ wc.checkpoint(2)
self.setupCalled = True
return ssc
@@ -921,21 +921,22 @@ class CheckpointTests(unittest.TestCase):
def check_output(n):
while not os.listdir(outputd):
- time.sleep(0.01)
+ if self.ssc.awaitTerminationOrTimeout(0.5):
+ raise Exception("ssc stopped")
time.sleep(1) # make sure mtime is larger than the previous one
with open(os.path.join(inputd, str(n)), 'w') as f:
f.writelines(["%d\n" % i for i in range(10)])
while True:
+ if self.ssc.awaitTerminationOrTimeout(0.5):
+ raise Exception("ssc stopped")
p = os.path.join(outputd, max(os.listdir(outputd)))
if '_SUCCESS' not in os.listdir(p):
# not finished
- time.sleep(0.01)
continue
ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
d = ordd.values().map(int).collect()
if not d:
- time.sleep(0.01)
continue
self.assertEqual(10, len(d))
s = set(d)