aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/mllib
diff options
context:
space:
mode:
authorJoseph K. Bradley <joseph@databricks.com>2015-08-15 18:48:20 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-15 18:48:20 -0700
commit1db7179fae672fcec7b8de12c374dd384ce51c67 (patch)
treef105ee3ee4923b57cfcf4e89af4c58a40fddbe6a /python/pyspark/mllib
parent570567258b5839c1e0e28b5182f4c29b119ed4c4 (diff)
downloadspark-1db7179fae672fcec7b8de12c374dd384ce51c67.tar.gz
spark-1db7179fae672fcec7b8de12c374dd384ce51c67.tar.bz2
spark-1db7179fae672fcec7b8de12c374dd384ce51c67.zip
[SPARK-9805] [MLLIB] [PYTHON] [STREAMING] Added _eventually for ml streaming pyspark tests
Recently, PySpark ML streaming tests have been flaky, most likely because of the batches not being processed in time. Proposal: Replace the use of _ssc_wait (which waits for a fixed amount of time) with a method which waits for a fixed amount of time but can terminate early based on a termination condition method. With this, we can extend the waiting period (to make tests less flaky) but also stop early when possible (making tests faster on average, which I verified locally). CC: mengxr tdas freeman-lab Author: Joseph K. Bradley <joseph@databricks.com> Closes #8087 from jkbradley/streaming-ml-tests.
Diffstat (limited to 'python/pyspark/mllib')
-rw-r--r--python/pyspark/mllib/tests.py177
1 files changed, 129 insertions, 48 deletions
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 3f5a02af12..5097c5e8ba 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -32,6 +32,9 @@ from numpy import sum as array_sum
from py4j.protocol import Py4JJavaError
+if sys.version > '3':
+ basestring = str
+
if sys.version_info[:2] <= (2, 6):
try:
import unittest2 as unittest
@@ -86,9 +89,42 @@ class MLLibStreamingTestCase(unittest.TestCase):
self.ssc.stop(False)
@staticmethod
- def _ssc_wait(start_time, end_time, sleep_time):
- while time() - start_time < end_time:
+ def _eventually(condition, timeout=30.0, catch_assertions=False):
+ """
+ Wait a given amount of time for a condition to pass, else fail with an error.
+ This is a helper utility for streaming ML tests.
+ :param condition: Function that checks for termination conditions.
+ condition() can return:
+ - True: Conditions met. Return without error.
+ - other value: Conditions not met yet. Continue. Upon timeout,
+ include last such value in error message.
+ Note that this method may be called at any time during
+ streaming execution (e.g., even before any results
+ have been created).
+ :param timeout: Number of seconds to wait. Default 30 seconds.
+ :param catch_assertions: If False (default), do not catch AssertionErrors.
+ If True, catch AssertionErrors; continue, but save
+ error to throw upon timeout.
+ """
+ start_time = time()
+ lastValue = None
+ while time() - start_time < timeout:
+ if catch_assertions:
+ try:
+ lastValue = condition()
+ except AssertionError as e:
+ lastValue = e
+ else:
+ lastValue = condition()
+ if lastValue is True:
+ return
sleep(0.01)
+ if isinstance(lastValue, AssertionError):
+ raise lastValue
+ else:
+ raise AssertionError(
+ "Test failed due to timeout after %g sec, with last condition returning: %s"
+ % (timeout, lastValue))
def _squared_distance(a, b):
@@ -999,10 +1035,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
[self.sc.parallelize(batch, 1) for batch in batches])
stkm.trainOn(input_stream)
- t = time()
self.ssc.start()
- self._ssc_wait(t, 10.0, 0.01)
- self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+
+ def condition():
+ self.assertEquals(stkm.latestModel().clusterWeights, [25.0])
+ return True
+ self._eventually(condition, catch_assertions=True)
+
realCenters = array_sum(array(centers), axis=0)
for i in range(5):
modelCenters = stkm.latestModel().centers[0][i]
@@ -1027,7 +1066,7 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
stkm.setInitialCenters(
centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
- # Create a toy dataset by setting a tiny offest for each point.
+ # Create a toy dataset by setting a tiny offset for each point.
offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
batches = []
for offset in offsets:
@@ -1037,14 +1076,15 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
batches = [self.sc.parallelize(batch, 1) for batch in batches]
input_stream = self.ssc.queueStream(batches)
stkm.trainOn(input_stream)
- t = time()
self.ssc.start()
# Give enough time to train the model.
- self._ssc_wait(t, 6.0, 0.01)
- finalModel = stkm.latestModel()
- self.assertTrue(all(finalModel.centers == array(initCenters)))
- self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+ def condition():
+ finalModel = stkm.latestModel()
+ self.assertTrue(all(finalModel.centers == array(initCenters)))
+ self.assertEquals(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
+ return True
+ self._eventually(condition, catch_assertions=True)
def test_predictOn_model(self):
"""Test that the model predicts correctly on toy data."""
@@ -1066,10 +1106,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
result.append(rdd_collect)
predict_val.foreachRDD(update)
- t = time()
self.ssc.start()
- self._ssc_wait(t, 6.0, 0.01)
- self.assertEquals(result, [[0], [1], [2], [3]])
+
+ def condition():
+ self.assertEquals(result, [[0], [1], [2], [3]])
+ return True
+
+ self._eventually(condition, catch_assertions=True)
def test_trainOn_predictOn(self):
"""Test that prediction happens on the updated model."""
@@ -1095,10 +1138,13 @@ class StreamingKMeansTest(MLLibStreamingTestCase):
predict_stream = stkm.predictOn(input_stream)
predict_stream.foreachRDD(collect)
- t = time()
self.ssc.start()
- self._ssc_wait(t, 6.0, 0.01)
- self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+
+ def condition():
+ self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
+ return True
+
+ self._eventually(condition, catch_assertions=True)
class LinearDataGeneratorTests(MLlibTestCase):
@@ -1156,11 +1202,14 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
slr.setInitialWeights([0.0])
slr.trainOn(input_stream)
- t = time()
self.ssc.start()
- self._ssc_wait(t, 20.0, 0.01)
- rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
- self.assertAlmostEqual(rel, 0.1, 1)
+
+ def condition():
+ rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
+ self.assertAlmostEqual(rel, 0.1, 1)
+ return True
+
+ self._eventually(condition, catch_assertions=True)
def test_convergence(self):
"""
@@ -1179,13 +1228,18 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
input_stream.foreachRDD(
lambda x: models.append(slr.latestModel().weights[0]))
- t = time()
self.ssc.start()
- self._ssc_wait(t, 15.0, 0.01)
+
+ def condition():
+ self.assertEquals(len(models), len(input_batches))
+ return True
+
+ # We want all batches to finish for this test.
+ self._eventually(condition, 60.0, catch_assertions=True)
+
t_models = array(models)
diff = t_models[1:] - t_models[:-1]
-
- # Test that weights improve with a small tolerance,
+ # Test that weights improve with a small tolerance
self.assertTrue(all(diff >= -0.1))
self.assertTrue(array_sum(diff > 0) > 1)
@@ -1208,9 +1262,13 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
predict_stream = slr.predictOnValues(input_stream)
true_predicted = []
predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
- t = time()
self.ssc.start()
- self._ssc_wait(t, 5.0, 0.01)
+
+ def condition():
+ self.assertEquals(len(true_predicted), len(input_batches))
+ return True
+
+ self._eventually(condition, catch_assertions=True)
# Test that the accuracy error is no more than 0.4 on each batch.
for batch in true_predicted:
@@ -1242,12 +1300,17 @@ class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
ps = slr.predictOnValues(predict_stream)
ps.foreachRDD(lambda x: collect_errors(x))
- t = time()
self.ssc.start()
- self._ssc_wait(t, 20.0, 0.01)
- # Test that the improvement in error is atleast 0.3
- self.assertTrue(errors[1] - errors[-1] > 0.3)
+ def condition():
+ # Test that the improvement in error is > 0.3
+ if len(errors) == len(predict_batches):
+ self.assertGreater(errors[1] - errors[-1], 0.3)
+ if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
+ return True
+ return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
+
+ self._eventually(condition)
class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
@@ -1274,13 +1337,16 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
batches.append(sc.parallelize(batch))
input_stream = self.ssc.queueStream(batches)
- t = time()
slr.trainOn(input_stream)
self.ssc.start()
- self._ssc_wait(t, 10, 0.01)
- self.assertArrayAlmostEqual(
- slr.latestModel().weights.array, [10., 10.], 1)
- self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+
+ def condition():
+ self.assertArrayAlmostEqual(
+ slr.latestModel().weights.array, [10., 10.], 1)
+ self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
+ return True
+
+ self._eventually(condition, catch_assertions=True)
def test_parameter_convergence(self):
"""Test that the model parameters improve with streaming data."""
@@ -1298,13 +1364,18 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
input_stream = self.ssc.queueStream(batches)
input_stream.foreachRDD(
lambda x: model_weights.append(slr.latestModel().weights[0]))
- t = time()
slr.trainOn(input_stream)
self.ssc.start()
- self._ssc_wait(t, 10, 0.01)
- model_weights = array(model_weights)
- diff = model_weights[1:] - model_weights[:-1]
+ def condition():
+ self.assertEquals(len(model_weights), len(batches))
+ return True
+
+ # We want all batches to finish for this test.
+ self._eventually(condition, catch_assertions=True)
+
+ w = array(model_weights)
+ diff = w[1:] - w[:-1]
self.assertTrue(all(diff >= -0.1))
def test_prediction(self):
@@ -1323,13 +1394,18 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
input_stream = self.ssc.queueStream(batches)
- t = time()
output_stream = slr.predictOnValues(input_stream)
samples = []
output_stream.foreachRDD(lambda x: samples.append(x.collect()))
self.ssc.start()
- self._ssc_wait(t, 5, 0.01)
+
+ def condition():
+ self.assertEquals(len(samples), len(batches))
+ return True
+
+ # We want all batches to finish for this test.
+ self._eventually(condition, catch_assertions=True)
# Test that mean absolute error on each batch is less than 0.1
for batch in samples:
@@ -1350,22 +1426,27 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
predict_batches = [
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
- mean_absolute_errors = []
+ errors = []
def func(rdd):
true, predicted = zip(*rdd.collect())
- mean_absolute_errors.append(mean(abs(true) - abs(predicted)))
+ errors.append(mean(abs(true) - abs(predicted)))
- model_weights = []
input_stream = self.ssc.queueStream(batches)
output_stream = self.ssc.queueStream(predict_batches)
- t = time()
slr.trainOn(input_stream)
output_stream = slr.predictOnValues(output_stream)
output_stream.foreachRDD(func)
self.ssc.start()
- self._ssc_wait(t, 10, 0.01)
- self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
+
+ def condition():
+ if len(errors) == len(predict_batches):
+ self.assertGreater(errors[1] - errors[-1], 2)
+ if len(errors) >= 3 and errors[1] - errors[-1] > 2:
+ return True
+ return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
+
+ self._eventually(condition)
class MLUtilsTests(MLlibTestCase):