aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-21 17:49:55 -0700
committerReynold Xin <rxin@databricks.com>2015-04-21 17:49:55 -0700
commit3134c3fe495862b7687b5aa00d3344d09cd5e08e (patch)
treeed556b21bbaad651c7893b6b2dcb53f304100785 /python
parente72c16e30d85cdc394d318b5551698885cfda9b8 (diff)
downloadspark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.gz
spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.bz2
spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.zip
[SPARK-6953] [PySpark] speed up python tests
This PR try to speed up some python tests: ``` tests.py 144s -> 103s -41s mllib/classification.py 24s -> 17s -7s mllib/regression.py 27s -> 15s -12s mllib/tree.py 27s -> 13s -14s mllib/tests.py 64s -> 31s -33s streaming/tests.py 185s -> 84s -101s ``` Considering python3, the total saving will be 558s (almost 10 minutes) (core, and streaming run three times, mllib runs twice). During testing, it will show used time for each test file: ``` Run core tests ... Running test: pyspark/rdd.py ... ok (22s) Running test: pyspark/context.py ... ok (16s) Running test: pyspark/conf.py ... ok (4s) Running test: pyspark/broadcast.py ... ok (4s) Running test: pyspark/accumulators.py ... ok (4s) Running test: pyspark/serializers.py ... ok (6s) Running test: pyspark/profiler.py ... ok (5s) Running test: pyspark/shuffle.py ... ok (1s) Running test: pyspark/tests.py ... ok (103s) 144s ``` Author: Reynold Xin <rxin@databricks.com> Author: Xiangrui Meng <meng@databricks.com> Closes #5605 from rxin/python-tests-speed and squashes the following commits: d08542d [Reynold Xin] Merge pull request #14 from mengxr/SPARK-6953 89321ee [Xiangrui Meng] fix seed in tests 3ad2387 [Reynold Xin] Merge pull request #5427 from davies/python_tests
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/mllib/classification.py17
-rw-r--r--python/pyspark/mllib/regression.py25
-rw-r--r--python/pyspark/mllib/tests.py69
-rw-r--r--python/pyspark/mllib/tree.py15
-rw-r--r--python/pyspark/shuffle.py7
-rw-r--r--python/pyspark/sql/tests.py4
-rw-r--r--python/pyspark/streaming/tests.py63
-rw-r--r--python/pyspark/tests.py96
-rwxr-xr-xpython/run-tests13
9 files changed, 182 insertions, 127 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index eda0b60f8b..a70c664a71 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -86,7 +86,7 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
- >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
@@ -95,7 +95,7 @@ class LogisticRegressionModel(LinearClassificationModel):
[1, 0]
>>> lrm.clearThreshold()
>>> lrm.predict([0.0, 1.0])
- 0.123...
+ 0.279...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
@@ -103,7 +103,7 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
- >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> lrm.predict(array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
@@ -129,7 +129,8 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(1.0, [1.0, 0.0, 0.0]),
... LabeledPoint(2.0, [0.0, 0.0, 1.0])
... ]
- >>> mcm = LogisticRegressionWithLBFGS.train(data=sc.parallelize(multi_class_data), numClasses=3)
+ >>> data = sc.parallelize(multi_class_data)
+ >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
>>> mcm.predict([0.0, 0.5, 0.0])
0
>>> mcm.predict([0.8, 0.0, 0.0])
@@ -298,7 +299,7 @@ class LogisticRegressionWithLBFGS(object):
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
- >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data))
+ >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
@@ -330,14 +331,14 @@ class SVMModel(LinearClassificationModel):
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
- >>> svm = SVMWithSGD.train(sc.parallelize(data))
+ >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
>>> svm.predict([1.0])
1
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
- 1.25...
+ 1.44...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
@@ -345,7 +346,7 @@ class SVMModel(LinearClassificationModel):
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
- >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> svm.predict(SparseVector(2, {1: 1.0}))
1
>>> svm.predict(SparseVector(2, {0: -1.0}))
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index a0117c5713..4bc6351bdf 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -108,7 +108,8 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -135,12 +136,13 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, regType="l2",
... intercept=True, validateData=True)
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
@@ -238,7 +240,7 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -265,12 +267,13 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -321,7 +324,8 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -348,12 +352,13 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -396,7 +401,7 @@ def _test():
from pyspark import SparkContext
import pyspark.mllib.regression
globs = pyspark.mllib.regression.__dict__.copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8f89e2cee0..1b008b93bc 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -36,6 +36,7 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
+from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices
@@ -47,7 +48,6 @@ from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.serializers import PickleSerializer
from pyspark.sql import SQLContext
-from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
_have_scipy = False
try:
@@ -58,6 +58,12 @@ except:
pass
ser = PickleSerializer()
+sc = SparkContext('local[4]', "MLlib tests")
+
+
+class MLlibTestCase(unittest.TestCase):
+ def setUp(self):
+ self.sc = sc
def _squared_distance(a, b):
@@ -67,7 +73,7 @@ def _squared_distance(a, b):
return b.squared_distance(a)
-class VectorTests(PySparkTestCase):
+class VectorTests(MLlibTestCase):
def _test_serialize(self, v):
self.assertEqual(v, ser.loads(ser.dumps(v)))
@@ -212,7 +218,7 @@ class VectorTests(PySparkTestCase):
self.assertTrue(array_equal(sm.values, [1, 3, 4, 6, 9]))
-class ListTests(PySparkTestCase):
+class ListTests(MLlibTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
@@ -255,7 +261,7 @@ class ListTests(PySparkTestCase):
[-6, -7],
])
clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
- maxIterations=100, seed=56)
+ maxIterations=10, seed=56)
labels = clusters.predict(data).collect()
self.assertEquals(labels[0], labels[1])
self.assertEquals(labels[2], labels[3])
@@ -266,9 +272,9 @@ class ListTests(PySparkTestCase):
y = range(0, 100, 10)
data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
- maxIterations=100, seed=63)
+ maxIterations=10, seed=63)
clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
- maxIterations=100, seed=63)
+ maxIterations=10, seed=63)
for c1, c2 in zip(clusters1.weights, clusters2.weights):
self.assertEquals(round(c1, 7), round(c2, 7))
@@ -287,13 +293,13 @@ class ListTests(PySparkTestCase):
temp_dir = tempfile.mkdtemp()
- lr_model = LogisticRegressionWithSGD.train(rdd)
+ lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
- svm_model = SVMWithSGD.train(rdd)
+ svm_model = SVMWithSGD.train(rdd, iterations=10)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
@@ -307,7 +313,7 @@ class ListTests(PySparkTestCase):
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(
- rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
@@ -319,7 +325,8 @@ class ListTests(PySparkTestCase):
self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
rf_model = RandomForest.trainClassifier(
- rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
+ rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10,
+ maxBins=4, seed=1)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
@@ -331,7 +338,7 @@ class ListTests(PySparkTestCase):
self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
gbt_model = GradientBoostedTrees.trainClassifier(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
@@ -360,19 +367,19 @@ class ListTests(PySparkTestCase):
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
- lr_model = LinearRegressionWithSGD.train(rdd)
+ lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
- lasso_model = LassoWithSGD.train(rdd)
+ lasso_model = LassoWithSGD.train(rdd, iterations=10)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
- rr_model = RidgeRegressionWithSGD.train(rdd)
+ rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
@@ -380,35 +387,35 @@ class ListTests(PySparkTestCase):
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
rf_model = RandomForest.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100, seed=1)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
gbt_model = GradientBoostedTrees.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
try:
- LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
- LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
- RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
+ LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
+ LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
+ RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
except ValueError:
self.fail()
-class StatTests(PySparkTestCase):
+class StatTests(MLlibTestCase):
# SPARK-4023
def test_col_with_different_rdds(self):
# numpy
@@ -438,7 +445,7 @@ class StatTests(PySparkTestCase):
self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14)
-class VectorUDTTests(PySparkTestCase):
+class VectorUDTTests(MLlibTestCase):
dv0 = DenseVector([])
dv1 = DenseVector([1.0, 2.0])
@@ -472,7 +479,7 @@ class VectorUDTTests(PySparkTestCase):
@unittest.skipIf(not _have_scipy, "SciPy not installed")
-class SciPyTests(PySparkTestCase):
+class SciPyTests(MLlibTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
@@ -613,7 +620,7 @@ class SciPyTests(PySparkTestCase):
self.assertTrue(dt_model.predict(features[3]) > 0)
-class ChiSqTestTests(PySparkTestCase):
+class ChiSqTestTests(MLlibTestCase):
def test_goodness_of_fit(self):
from numpy import inf
@@ -711,13 +718,13 @@ class ChiSqTestTests(PySparkTestCase):
self.assertIsNotNone(chi[1000])
-class SerDeTest(PySparkTestCase):
+class SerDeTest(MLlibTestCase):
def test_to_java_object_rdd(self): # SPARK-6660
data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0)
self.assertEqual(_to_java_object_rdd(data).count(), 10)
-class FeatureTest(PySparkTestCase):
+class FeatureTest(MLlibTestCase):
def test_idf_model(self):
data = [
Vectors.dense([1, 2, 6, 0, 2, 3, 1, 1, 0, 0, 3]),
@@ -730,13 +737,8 @@ class FeatureTest(PySparkTestCase):
self.assertEqual(len(idf), 11)
-class Word2VecTests(PySparkTestCase):
+class Word2VecTests(MLlibTestCase):
def test_word2vec_setters(self):
- data = [
- ["I", "have", "a", "pen"],
- ["I", "like", "soccer", "very", "much"],
- ["I", "live", "in", "Tokyo"]
- ]
model = Word2Vec() \
.setVectorSize(2) \
.setLearningRate(0.01) \
@@ -765,7 +767,7 @@ class Word2VecTests(PySparkTestCase):
self.assertEquals(len(model.getVectors()), 3)
-class StandardScalerTests(PySparkTestCase):
+class StandardScalerTests(MLlibTestCase):
def test_model_setters(self):
data = [
[1.0, 2.0, 3.0],
@@ -793,3 +795,4 @@ if __name__ == "__main__":
unittest.main()
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
+ sc.stop()
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 0fe6e4fabe..cfcbea573f 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -482,13 +482,13 @@ class GradientBoostedTrees(object):
... LabeledPoint(1.0, [3.0])
... ]
>>>
- >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {})
+ >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10)
>>> model.numTrees()
- 100
+ 10
>>> model.totalNumNodes()
- 300
+ 30
>>> print(model) # it already has newline
- TreeEnsembleModel classifier with 100 trees
+ TreeEnsembleModel classifier with 10 trees
<BLANKLINE>
>>> model.predict([2.0])
1.0
@@ -541,11 +541,12 @@ class GradientBoostedTrees(object):
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
- >>> model = GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {})
+ >>> data = sc.parallelize(sparse_data)
+ >>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10)
>>> model.numTrees()
- 100
+ 10
>>> model.totalNumNodes()
- 102
+ 12
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b54baa57ec..1d0b16cade 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch, limit = 100, self._next_limit()
+ batch, limit = 100, self.memory_limit
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -497,7 +497,7 @@ class ExternalSorter(object):
break
used_memory = get_used_memory()
- if used_memory > self.memory_limit:
+ if used_memory > limit:
# sort them inplace will save memory
current_chunk.sort(key=key, reverse=reverse)
path = self._get_path(len(chunks))
@@ -513,13 +513,14 @@ class ExternalSorter(object):
chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
+ batch //= 2
limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
elif not chunks:
- batch = min(batch * 2, 10000)
+ batch = min(int(batch * 1.5), 10000)
current_chunk.sort(key=key, reverse=reverse)
if not chunks:
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 23e8428367..fe43c374f1 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -109,7 +109,7 @@ class SQLTests(ReusedPySparkTestCase):
os.unlink(cls.tempdir.name)
cls.sqlCtx = SQLContext(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
- rdd = cls.sc.parallelize(cls.testData)
+ rdd = cls.sc.parallelize(cls.testData, 2)
cls.df = rdd.toDF()
@classmethod
@@ -303,7 +303,7 @@ class SQLTests(ReusedPySparkTestCase):
abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
schema = _parse_schema_abstract(abstract)
typedSchema = _infer_schema_type(rdd.first(), schema)
- df = self.sqlCtx.applySchema(rdd, typedSchema)
+ df = self.sqlCtx.createDataFrame(rdd, typedSchema)
r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3])
self.assertEqual(r, tuple(df.first()))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 33f958a601..5fa1e5ef08 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -16,14 +16,23 @@
#
import os
+import sys
from itertools import chain
import time
import operator
-import unittest
import tempfile
import struct
from functools import reduce
+if sys.version_info[:2] <= (2, 6):
+ try:
+ import unittest2 as unittest
+ except ImportError:
+ sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
+ sys.exit(1)
+else:
+ import unittest
+
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
@@ -31,19 +40,25 @@ from pyspark.streaming.kafka import KafkaUtils
class PySparkStreamingTestCase(unittest.TestCase):
- timeout = 20 # seconds
- duration = 1
+ timeout = 4 # seconds
+ duration = .2
- def setUp(self):
- class_name = self.__class__.__name__
+ @classmethod
+ def setUpClass(cls):
+ class_name = cls.__name__
conf = SparkConf().set("spark.default.parallelism", 1)
- self.sc = SparkContext(appName=class_name, conf=conf)
- self.sc.setCheckpointDir("/tmp")
- # TODO: decrease duration to speed up tests
+ cls.sc = SparkContext(appName=class_name, conf=conf)
+ cls.sc.setCheckpointDir("/tmp")
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sc.stop()
+
+ def setUp(self):
self.ssc = StreamingContext(self.sc, self.duration)
def tearDown(self):
- self.ssc.stop()
+ self.ssc.stop(False)
def wait_for(self, result, n):
start_time = time.time()
@@ -363,13 +378,13 @@ class BasicOperationTests(PySparkStreamingTestCase):
class WindowFunctionTests(PySparkStreamingTestCase):
- timeout = 20
+ timeout = 5
def test_window(self):
input = [range(1), range(2), range(3), range(4), range(5)]
def func(dstream):
- return dstream.window(3, 1).count()
+ return dstream.window(.6, .2).count()
expected = [[1], [3], [6], [9], [12], [9], [5]]
self._test_func(input, func, expected)
@@ -378,7 +393,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5)]
def func(dstream):
- return dstream.countByWindow(3, 1)
+ return dstream.countByWindow(.6, .2)
expected = [[1], [3], [6], [9], [12], [9], [5]]
self._test_func(input, func, expected)
@@ -387,7 +402,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
def func(dstream):
- return dstream.countByWindow(5, 1)
+ return dstream.countByWindow(1, .2)
expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]]
self._test_func(input, func, expected)
@@ -396,7 +411,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
def func(dstream):
- return dstream.countByValueAndWindow(5, 1)
+ return dstream.countByValueAndWindow(1, .2)
expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
self._test_func(input, func, expected)
@@ -405,7 +420,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [[('a', i)] for i in range(5)]
def func(dstream):
- return dstream.groupByKeyAndWindow(3, 1).mapValues(list)
+ return dstream.groupByKeyAndWindow(.6, .2).mapValues(list)
expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
@@ -436,8 +451,8 @@ class StreamingContextTests(PySparkStreamingTestCase):
def test_stop_multiple_times(self):
self._add_input_stream()
self.ssc.start()
- self.ssc.stop()
- self.ssc.stop()
+ self.ssc.stop(False)
+ self.ssc.stop(False)
def test_queue_stream(self):
input = [list(range(i + 1)) for i in range(3)]
@@ -495,10 +510,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.assertEqual([2, 3, 1], self._take(dstream, 3))
-class CheckpointTests(PySparkStreamingTestCase):
-
- def setUp(self):
- pass
+class CheckpointTests(unittest.TestCase):
def test_get_or_create(self):
inputd = tempfile.mkdtemp()
@@ -518,12 +530,12 @@ class CheckpointTests(PySparkStreamingTestCase):
return ssc
cpd = tempfile.mkdtemp("test_streaming_cps")
- self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+ ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()
def check_output(n):
while not os.listdir(outputd):
- time.sleep(0.1)
+ time.sleep(0.01)
time.sleep(1) # make sure mtime is larger than the previous one
with open(os.path.join(inputd, str(n)), 'w') as f:
f.writelines(["%d\n" % i for i in range(10)])
@@ -553,12 +565,15 @@ class CheckpointTests(PySparkStreamingTestCase):
ssc.stop(True, True)
time.sleep(1)
- self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+ ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()
check_output(3)
+ ssc.stop(True, True)
class KafkaStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 75f39d9e75..ea63a396da 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -31,7 +31,6 @@ import tempfile
import time
import zipfile
import random
-import itertools
import threading
import hashlib
@@ -49,6 +48,11 @@ else:
xrange = range
basestring = str
+if sys.version >= "3":
+ from io import StringIO
+else:
+ from StringIO import StringIO
+
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
@@ -196,7 +200,7 @@ class SorterTests(unittest.TestCase):
sc = SparkContext(conf=conf)
l = list(range(10240))
random.shuffle(l)
- rdd = sc.parallelize(l, 2)
+ rdd = sc.parallelize(l, 4)
self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
sc.stop()
@@ -300,6 +304,18 @@ class SerializationTestCase(unittest.TestCase):
hash(FlattenedValuesSerializer(PickleSerializer()))
+class QuietTest(object):
+ def __init__(self, sc):
+ self.log4j = sc._jvm.org.apache.log4j
+
+ def __enter__(self):
+ self.old_level = self.log4j.LogManager.getRootLogger().getLevel()
+ self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.log4j.LogManager.getRootLogger().setLevel(self.old_level)
+
+
class PySparkTestCase(unittest.TestCase):
def setUp(self):
@@ -371,15 +387,11 @@ class AddFileTests(PySparkTestCase):
# To ensure that we're actually testing addPyFile's effects, check that
# this job fails due to `userlibrary` not being on the Python path:
# disable logging in log4j temporarily
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
-
def func(x):
from userlibrary import UserClass
return UserClass().hello()
- self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
- log4j.LogManager.getRootLogger().setLevel(old_level)
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
# Add the file, so the job should now succeed:
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
@@ -496,7 +508,8 @@ class RDDTests(ReusedPySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
def test_sampling_default_seed(self):
# Test for SPARK-3995 (default seed setting)
@@ -536,9 +549,9 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual([jon, jane], theDoes.collect())
def test_large_broadcast(self):
- N = 100000
+ N = 10000
data = [[float(i) for i in range(300)] for i in range(N)]
- bdata = self.sc.broadcast(data) # 270MB
+ bdata = self.sc.broadcast(data) # 27MB
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEqual(N, m)
@@ -569,7 +582,7 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(checksum, csum)
def test_large_closure(self):
- N = 1000000
+ N = 200000
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
self.assertEqual(N, rdd.first())
@@ -604,17 +617,18 @@ class RDDTests(ReusedPySparkTestCase):
# different number of partitions
b = self.sc.parallelize(range(100, 106), 3)
self.assertRaises(ValueError, lambda: a.zip(b))
- # different number of batched items in JVM
- b = self.sc.parallelize(range(100, 104), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # different number of items in one pair
- b = self.sc.parallelize(range(100, 106), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # same total number of items, but different distributions
- a = self.sc.parallelize([2, 3], 2).flatMap(range)
- b = self.sc.parallelize([3, 2], 2).flatMap(range)
- self.assertEqual(a.count(), b.count())
- self.assertRaises(Exception, lambda: a.zip(b).count())
+ with QuietTest(self.sc):
+ # different number of batched items in JVM
+ b = self.sc.parallelize(range(100, 104), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # different number of items in one pair
+ b = self.sc.parallelize(range(100, 106), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # same total number of items, but different distributions
+ a = self.sc.parallelize([2, 3], 2).flatMap(range)
+ b = self.sc.parallelize([3, 2], 2).flatMap(range)
+ self.assertEqual(a.count(), b.count())
+ self.assertRaises(Exception, lambda: a.zip(b).count())
def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
@@ -877,7 +891,12 @@ class ProfilerTests(PySparkTestCase):
func_names = [func_name for fname, n, func_name in stat_list]
self.assertTrue("heavy_foo" in func_names)
+ old_stdout = sys.stdout
+ sys.stdout = io = StringIO()
self.sc.show_profiles()
+ self.assertTrue("heavy_foo" in io.getvalue())
+ sys.stdout = old_stdout
+
d = tempfile.gettempdir()
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
@@ -901,7 +920,7 @@ class ProfilerTests(PySparkTestCase):
def do_computation(self):
def heavy_foo(x):
- for i in range(1 << 20):
+ for i in range(1 << 18):
x = 1
rdd = self.sc.parallelize(range(100))
@@ -1417,7 +1436,7 @@ class DaemonTests(unittest.TestCase):
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
-class WorkerTests(PySparkTestCase):
+class WorkerTests(ReusedPySparkTestCase):
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
@@ -1432,7 +1451,10 @@ class WorkerTests(PySparkTestCase):
# start job in background thread
def run():
- self.sc.parallelize(range(1), 1).foreach(sleep)
+ try:
+ self.sc.parallelize(range(1), 1).foreach(sleep)
+ except Exception:
+ pass
import threading
t = threading.Thread(target=run)
t.daemon = True
@@ -1473,7 +1495,8 @@ class WorkerTests(PySparkTestCase):
def raise_exception(_):
raise Exception()
rdd = self.sc.parallelize(range(100), 1)
- self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
self.assertEqual(100, rdd.map(str).count())
def test_after_jvm_exception(self):
@@ -1484,7 +1507,8 @@ class WorkerTests(PySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
rdd = self.sc.parallelize(range(100), 1)
self.assertEqual(100, rdd.map(str).count())
@@ -1522,14 +1546,11 @@ class WorkerTests(PySparkTestCase):
rdd.count()
version = sys.version_info
sys.version_info = (2, 0, 0)
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
try:
- self.assertRaises(Py4JJavaError, lambda: rdd.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Py4JJavaError, lambda: rdd.count())
finally:
sys.version_info = version
- log4j.LogManager.getRootLogger().setLevel(old_level)
class SparkSubmitTests(unittest.TestCase):
@@ -1751,9 +1772,14 @@ class ContextTests(unittest.TestCase):
def test_progress_api(self):
with SparkContext() as sc:
sc.setJobGroup('test_progress_api', '', True)
-
rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
- t = threading.Thread(target=rdd.collect)
+
+ def run():
+ try:
+ rdd.count()
+ except Exception:
+ pass
+ t = threading.Thread(target=run)
t.daemon = True
t.start()
# wait for scheduler to start
diff --git a/python/run-tests b/python/run-tests
index ed3e819ef3..88b63b84fd 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -28,6 +28,7 @@ cd "$FWDIR/python"
FAILED=0
LOG_FILE=unit-tests.log
+START=$(date +"%s")
rm -f $LOG_FILE
@@ -35,8 +36,8 @@ rm -f $LOG_FILE
rm -rf metastore warehouse
function run_test() {
- echo "Running test: $1" | tee -a $LOG_FILE
-
+ echo -en "Running test: $1 ... " | tee -a $LOG_FILE
+ start=$(date +"%s")
SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1
FAILED=$((PIPESTATUS[0]||$FAILED))
@@ -48,6 +49,9 @@ function run_test() {
echo "Had test failures; see logs."
echo -en "\033[0m" # No color
exit -1
+ else
+ now=$(date +"%s")
+ echo "ok ($(($now - $start))s)"
fi
}
@@ -161,9 +165,8 @@ if [ $(which pypy) ]; then
fi
if [[ $FAILED == 0 ]]; then
- echo -en "\033[32m" # Green
- echo "Tests passed."
- echo -en "\033[0m" # No color
+ now=$(date +"%s")
+ echo -e "\033[32mTests passed \033[0min $(($now - $START)) seconds"
fi
# TODO: in the long-run, it would be nice to use a test runner like `nose`.