aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/pyspark/mllib/classification.py17
-rw-r--r--python/pyspark/mllib/regression.py25
-rw-r--r--python/pyspark/mllib/tests.py69
-rw-r--r--python/pyspark/mllib/tree.py15
-rw-r--r--python/pyspark/shuffle.py7
-rw-r--r--python/pyspark/sql/tests.py4
-rw-r--r--python/pyspark/streaming/tests.py63
-rw-r--r--python/pyspark/tests.py96
-rwxr-xr-xpython/run-tests13
9 files changed, 182 insertions, 127 deletions
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index eda0b60f8b..a70c664a71 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -86,7 +86,7 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
- >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
@@ -95,7 +95,7 @@ class LogisticRegressionModel(LinearClassificationModel):
[1, 0]
>>> lrm.clearThreshold()
>>> lrm.predict([0.0, 1.0])
- 0.123...
+ 0.279...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
@@ -103,7 +103,7 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(0.0, SparseVector(2, {0: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
- >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
+ >>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> lrm.predict(array([0.0, 1.0]))
1
>>> lrm.predict(array([1.0, 0.0]))
@@ -129,7 +129,8 @@ class LogisticRegressionModel(LinearClassificationModel):
... LabeledPoint(1.0, [1.0, 0.0, 0.0]),
... LabeledPoint(2.0, [0.0, 0.0, 1.0])
... ]
- >>> mcm = LogisticRegressionWithLBFGS.train(data=sc.parallelize(multi_class_data), numClasses=3)
+ >>> data = sc.parallelize(multi_class_data)
+ >>> mcm = LogisticRegressionWithLBFGS.train(data, iterations=10, numClasses=3)
>>> mcm.predict([0.0, 0.5, 0.0])
0
>>> mcm.predict([0.8, 0.0, 0.0])
@@ -298,7 +299,7 @@ class LogisticRegressionWithLBFGS(object):
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
- >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data))
+ >>> lrm = LogisticRegressionWithLBFGS.train(sc.parallelize(data), iterations=10)
>>> lrm.predict([1.0, 0.0])
1
>>> lrm.predict([0.0, 1.0])
@@ -330,14 +331,14 @@ class SVMModel(LinearClassificationModel):
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
- >>> svm = SVMWithSGD.train(sc.parallelize(data))
+ >>> svm = SVMWithSGD.train(sc.parallelize(data), iterations=10)
>>> svm.predict([1.0])
1
>>> svm.predict(sc.parallelize([[1.0]])).collect()
[1]
>>> svm.clearThreshold()
>>> svm.predict(array([1.0]))
- 1.25...
+ 1.44...
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: -1.0})),
@@ -345,7 +346,7 @@ class SVMModel(LinearClassificationModel):
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
- >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
+ >>> svm = SVMWithSGD.train(sc.parallelize(sparse_data), iterations=10)
>>> svm.predict(SparseVector(2, {1: 1.0}))
1
>>> svm.predict(SparseVector(2, {0: -1.0}))
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index a0117c5713..4bc6351bdf 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -108,7 +108,8 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=np.array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=np.array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -135,12 +136,13 @@ class LinearRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... miniBatchFraction=1.0, initialWeights=array([1.0]), regParam=0.1, regType="l2",
... intercept=True, validateData=True)
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
@@ -238,7 +240,7 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -265,12 +267,13 @@ class LassoModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = LassoWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -321,7 +324,8 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
- >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(np.array([1.0])) - 1) < 0.5
@@ -348,12 +352,13 @@ class RidgeRegressionModel(LinearRegressionModelBase):
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
- >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
+ >>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), iterations=10,
+ ... initialWeights=array([1.0]))
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
- >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=100, step=1.0,
+ >>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), iterations=10, step=1.0,
... regParam=0.01, miniBatchFraction=1.0, initialWeights=array([1.0]), intercept=True,
... validateData=True)
>>> abs(lrm.predict(np.array([0.0])) - 0) < 0.5
@@ -396,7 +401,7 @@ def _test():
from pyspark import SparkContext
import pyspark.mllib.regression
globs = pyspark.mllib.regression.__dict__.copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8f89e2cee0..1b008b93bc 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -36,6 +36,7 @@ if sys.version_info[:2] <= (2, 6):
else:
import unittest
+from pyspark import SparkContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
DenseMatrix, SparseMatrix, Vectors, Matrices
@@ -47,7 +48,6 @@ from pyspark.mllib.feature import IDF
from pyspark.mllib.feature import StandardScaler
from pyspark.serializers import PickleSerializer
from pyspark.sql import SQLContext
-from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase
_have_scipy = False
try:
@@ -58,6 +58,12 @@ except:
pass
ser = PickleSerializer()
+sc = SparkContext('local[4]', "MLlib tests")
+
+
+class MLlibTestCase(unittest.TestCase):
+ def setUp(self):
+ self.sc = sc
def _squared_distance(a, b):
@@ -67,7 +73,7 @@ def _squared_distance(a, b):
return b.squared_distance(a)
-class VectorTests(PySparkTestCase):
+class VectorTests(MLlibTestCase):
def _test_serialize(self, v):
self.assertEqual(v, ser.loads(ser.dumps(v)))
@@ -212,7 +218,7 @@ class VectorTests(PySparkTestCase):
self.assertTrue(array_equal(sm.values, [1, 3, 4, 6, 9]))
-class ListTests(PySparkTestCase):
+class ListTests(MLlibTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
@@ -255,7 +261,7 @@ class ListTests(PySparkTestCase):
[-6, -7],
])
clusters = GaussianMixture.train(data, 2, convergenceTol=0.001,
- maxIterations=100, seed=56)
+ maxIterations=10, seed=56)
labels = clusters.predict(data).collect()
self.assertEquals(labels[0], labels[1])
self.assertEquals(labels[2], labels[3])
@@ -266,9 +272,9 @@ class ListTests(PySparkTestCase):
y = range(0, 100, 10)
data = self.sc.parallelize([[a, b] for a, b in zip(x, y)])
clusters1 = GaussianMixture.train(data, 5, convergenceTol=0.001,
- maxIterations=100, seed=63)
+ maxIterations=10, seed=63)
clusters2 = GaussianMixture.train(data, 5, convergenceTol=0.001,
- maxIterations=100, seed=63)
+ maxIterations=10, seed=63)
for c1, c2 in zip(clusters1.weights, clusters2.weights):
self.assertEquals(round(c1, 7), round(c2, 7))
@@ -287,13 +293,13 @@ class ListTests(PySparkTestCase):
temp_dir = tempfile.mkdtemp()
- lr_model = LogisticRegressionWithSGD.train(rdd)
+ lr_model = LogisticRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
- svm_model = SVMWithSGD.train(rdd)
+ svm_model = SVMWithSGD.train(rdd, iterations=10)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
@@ -307,7 +313,7 @@ class ListTests(PySparkTestCase):
categoricalFeaturesInfo = {0: 3} # feature 0 has 3 categories
dt_model = DecisionTree.trainClassifier(
- rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
@@ -319,7 +325,8 @@ class ListTests(PySparkTestCase):
self.assertEqual(same_dt_model.toDebugString(), dt_model.toDebugString())
rf_model = RandomForest.trainClassifier(
- rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100)
+ rdd, numClasses=2, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10,
+ maxBins=4, seed=1)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
@@ -331,7 +338,7 @@ class ListTests(PySparkTestCase):
self.assertEqual(same_rf_model.toDebugString(), rf_model.toDebugString())
gbt_model = GradientBoostedTrees.trainClassifier(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
@@ -360,19 +367,19 @@ class ListTests(PySparkTestCase):
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
- lr_model = LinearRegressionWithSGD.train(rdd)
+ lr_model = LinearRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
- lasso_model = LassoWithSGD.train(rdd)
+ lasso_model = LassoWithSGD.train(rdd, iterations=10)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
- rr_model = RidgeRegressionWithSGD.train(rdd)
+ rr_model = RidgeRegressionWithSGD.train(rdd, iterations=10)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
@@ -380,35 +387,35 @@ class ListTests(PySparkTestCase):
categoricalFeaturesInfo = {0: 2} # feature 0 has 2 categories
dt_model = DecisionTree.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, maxBins=4)
self.assertTrue(dt_model.predict(features[0]) <= 0)
self.assertTrue(dt_model.predict(features[1]) > 0)
self.assertTrue(dt_model.predict(features[2]) <= 0)
self.assertTrue(dt_model.predict(features[3]) > 0)
rf_model = RandomForest.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=100, seed=1)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numTrees=10, maxBins=4, seed=1)
self.assertTrue(rf_model.predict(features[0]) <= 0)
self.assertTrue(rf_model.predict(features[1]) > 0)
self.assertTrue(rf_model.predict(features[2]) <= 0)
self.assertTrue(rf_model.predict(features[3]) > 0)
gbt_model = GradientBoostedTrees.trainRegressor(
- rdd, categoricalFeaturesInfo=categoricalFeaturesInfo)
+ rdd, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=4)
self.assertTrue(gbt_model.predict(features[0]) <= 0)
self.assertTrue(gbt_model.predict(features[1]) > 0)
self.assertTrue(gbt_model.predict(features[2]) <= 0)
self.assertTrue(gbt_model.predict(features[3]) > 0)
try:
- LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
- LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
- RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]))
+ LinearRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
+ LassoWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
+ RidgeRegressionWithSGD.train(rdd, initialWeights=array([1.0, 1.0]), iterations=10)
except ValueError:
self.fail()
-class StatTests(PySparkTestCase):
+class StatTests(MLlibTestCase):
# SPARK-4023
def test_col_with_different_rdds(self):
# numpy
@@ -438,7 +445,7 @@ class StatTests(PySparkTestCase):
self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14)
-class VectorUDTTests(PySparkTestCase):
+class VectorUDTTests(MLlibTestCase):
dv0 = DenseVector([])
dv1 = DenseVector([1.0, 2.0])
@@ -472,7 +479,7 @@ class VectorUDTTests(PySparkTestCase):
@unittest.skipIf(not _have_scipy, "SciPy not installed")
-class SciPyTests(PySparkTestCase):
+class SciPyTests(MLlibTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
@@ -613,7 +620,7 @@ class SciPyTests(PySparkTestCase):
self.assertTrue(dt_model.predict(features[3]) > 0)
-class ChiSqTestTests(PySparkTestCase):
+class ChiSqTestTests(MLlibTestCase):
def test_goodness_of_fit(self):
from numpy import inf
@@ -711,13 +718,13 @@ class ChiSqTestTests(PySparkTestCase):
self.assertIsNotNone(chi[1000])
-class SerDeTest(PySparkTestCase):
+class SerDeTest(MLlibTestCase):
def test_to_java_object_rdd(self): # SPARK-6660
data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0)
self.assertEqual(_to_java_object_rdd(data).count(), 10)
-class FeatureTest(PySparkTestCase):
+class FeatureTest(MLlibTestCase):
def test_idf_model(self):
data = [
Vectors.dense([1, 2, 6, 0, 2, 3, 1, 1, 0, 0, 3]),
@@ -730,13 +737,8 @@ class FeatureTest(PySparkTestCase):
self.assertEqual(len(idf), 11)
-class Word2VecTests(PySparkTestCase):
+class Word2VecTests(MLlibTestCase):
def test_word2vec_setters(self):
- data = [
- ["I", "have", "a", "pen"],
- ["I", "like", "soccer", "very", "much"],
- ["I", "live", "in", "Tokyo"]
- ]
model = Word2Vec() \
.setVectorSize(2) \
.setLearningRate(0.01) \
@@ -765,7 +767,7 @@ class Word2VecTests(PySparkTestCase):
self.assertEquals(len(model.getVectors()), 3)
-class StandardScalerTests(PySparkTestCase):
+class StandardScalerTests(MLlibTestCase):
def test_model_setters(self):
data = [
[1.0, 2.0, 3.0],
@@ -793,3 +795,4 @@ if __name__ == "__main__":
unittest.main()
if not _have_scipy:
print("NOTE: SciPy tests were skipped as it does not seem to be installed")
+ sc.stop()
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 0fe6e4fabe..cfcbea573f 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -482,13 +482,13 @@ class GradientBoostedTrees(object):
... LabeledPoint(1.0, [3.0])
... ]
>>>
- >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {})
+ >>> model = GradientBoostedTrees.trainClassifier(sc.parallelize(data), {}, numIterations=10)
>>> model.numTrees()
- 100
+ 10
>>> model.totalNumNodes()
- 300
+ 30
>>> print(model) # it already has newline
- TreeEnsembleModel classifier with 100 trees
+ TreeEnsembleModel classifier with 10 trees
<BLANKLINE>
>>> model.predict([2.0])
1.0
@@ -541,11 +541,12 @@ class GradientBoostedTrees(object):
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>>
- >>> model = GradientBoostedTrees.trainRegressor(sc.parallelize(sparse_data), {})
+ >>> data = sc.parallelize(sparse_data)
+ >>> model = GradientBoostedTrees.trainRegressor(data, {}, numIterations=10)
>>> model.numTrees()
- 100
+ 10
>>> model.totalNumNodes()
- 102
+ 12
>>> model.predict(SparseVector(2, {1: 1.0}))
1.0
>>> model.predict(SparseVector(2, {0: 1.0}))
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b54baa57ec..1d0b16cade 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -486,7 +486,7 @@ class ExternalSorter(object):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
- batch, limit = 100, self._next_limit()
+ batch, limit = 100, self.memory_limit
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
@@ -497,7 +497,7 @@ class ExternalSorter(object):
break
used_memory = get_used_memory()
- if used_memory > self.memory_limit:
+ if used_memory > limit:
# sort them inplace will save memory
current_chunk.sort(key=key, reverse=reverse)
path = self._get_path(len(chunks))
@@ -513,13 +513,14 @@ class ExternalSorter(object):
chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
+ batch //= 2
limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
elif not chunks:
- batch = min(batch * 2, 10000)
+ batch = min(int(batch * 1.5), 10000)
current_chunk.sort(key=key, reverse=reverse)
if not chunks:
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 23e8428367..fe43c374f1 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -109,7 +109,7 @@ class SQLTests(ReusedPySparkTestCase):
os.unlink(cls.tempdir.name)
cls.sqlCtx = SQLContext(cls.sc)
cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
- rdd = cls.sc.parallelize(cls.testData)
+ rdd = cls.sc.parallelize(cls.testData, 2)
cls.df = rdd.toDF()
@classmethod
@@ -303,7 +303,7 @@ class SQLTests(ReusedPySparkTestCase):
abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
schema = _parse_schema_abstract(abstract)
typedSchema = _infer_schema_type(rdd.first(), schema)
- df = self.sqlCtx.applySchema(rdd, typedSchema)
+ df = self.sqlCtx.createDataFrame(rdd, typedSchema)
r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3])
self.assertEqual(r, tuple(df.first()))
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 33f958a601..5fa1e5ef08 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -16,14 +16,23 @@
#
import os
+import sys
from itertools import chain
import time
import operator
-import unittest
import tempfile
import struct
from functools import reduce
+if sys.version_info[:2] <= (2, 6):
+ try:
+ import unittest2 as unittest
+ except ImportError:
+ sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier')
+ sys.exit(1)
+else:
+ import unittest
+
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
@@ -31,19 +40,25 @@ from pyspark.streaming.kafka import KafkaUtils
class PySparkStreamingTestCase(unittest.TestCase):
- timeout = 20 # seconds
- duration = 1
+ timeout = 4 # seconds
+ duration = .2
- def setUp(self):
- class_name = self.__class__.__name__
+ @classmethod
+ def setUpClass(cls):
+ class_name = cls.__name__
conf = SparkConf().set("spark.default.parallelism", 1)
- self.sc = SparkContext(appName=class_name, conf=conf)
- self.sc.setCheckpointDir("/tmp")
- # TODO: decrease duration to speed up tests
+ cls.sc = SparkContext(appName=class_name, conf=conf)
+ cls.sc.setCheckpointDir("/tmp")
+
+ @classmethod
+ def tearDownClass(cls):
+ cls.sc.stop()
+
+ def setUp(self):
self.ssc = StreamingContext(self.sc, self.duration)
def tearDown(self):
- self.ssc.stop()
+ self.ssc.stop(False)
def wait_for(self, result, n):
start_time = time.time()
@@ -363,13 +378,13 @@ class BasicOperationTests(PySparkStreamingTestCase):
class WindowFunctionTests(PySparkStreamingTestCase):
- timeout = 20
+ timeout = 5
def test_window(self):
input = [range(1), range(2), range(3), range(4), range(5)]
def func(dstream):
- return dstream.window(3, 1).count()
+ return dstream.window(.6, .2).count()
expected = [[1], [3], [6], [9], [12], [9], [5]]
self._test_func(input, func, expected)
@@ -378,7 +393,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5)]
def func(dstream):
- return dstream.countByWindow(3, 1)
+ return dstream.countByWindow(.6, .2)
expected = [[1], [3], [6], [9], [12], [9], [5]]
self._test_func(input, func, expected)
@@ -387,7 +402,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
def func(dstream):
- return dstream.countByWindow(5, 1)
+ return dstream.countByWindow(1, .2)
expected = [[1], [3], [6], [10], [15], [20], [18], [15], [11], [6]]
self._test_func(input, func, expected)
@@ -396,7 +411,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
def func(dstream):
- return dstream.countByValueAndWindow(5, 1)
+ return dstream.countByValueAndWindow(1, .2)
expected = [[1], [2], [3], [4], [5], [6], [6], [6], [6], [6]]
self._test_func(input, func, expected)
@@ -405,7 +420,7 @@ class WindowFunctionTests(PySparkStreamingTestCase):
input = [[('a', i)] for i in range(5)]
def func(dstream):
- return dstream.groupByKeyAndWindow(3, 1).mapValues(list)
+ return dstream.groupByKeyAndWindow(.6, .2).mapValues(list)
expected = [[('a', [0])], [('a', [0, 1])], [('a', [0, 1, 2])], [('a', [1, 2, 3])],
[('a', [2, 3, 4])], [('a', [3, 4])], [('a', [4])]]
@@ -436,8 +451,8 @@ class StreamingContextTests(PySparkStreamingTestCase):
def test_stop_multiple_times(self):
self._add_input_stream()
self.ssc.start()
- self.ssc.stop()
- self.ssc.stop()
+ self.ssc.stop(False)
+ self.ssc.stop(False)
def test_queue_stream(self):
input = [list(range(i + 1)) for i in range(3)]
@@ -495,10 +510,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.assertEqual([2, 3, 1], self._take(dstream, 3))
-class CheckpointTests(PySparkStreamingTestCase):
-
- def setUp(self):
- pass
+class CheckpointTests(unittest.TestCase):
def test_get_or_create(self):
inputd = tempfile.mkdtemp()
@@ -518,12 +530,12 @@ class CheckpointTests(PySparkStreamingTestCase):
return ssc
cpd = tempfile.mkdtemp("test_streaming_cps")
- self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+ ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()
def check_output(n):
while not os.listdir(outputd):
- time.sleep(0.1)
+ time.sleep(0.01)
time.sleep(1) # make sure mtime is larger than the previous one
with open(os.path.join(inputd, str(n)), 'w') as f:
f.writelines(["%d\n" % i for i in range(10)])
@@ -553,12 +565,15 @@ class CheckpointTests(PySparkStreamingTestCase):
ssc.stop(True, True)
time.sleep(1)
- self.ssc = ssc = StreamingContext.getOrCreate(cpd, setup)
+ ssc = StreamingContext.getOrCreate(cpd, setup)
ssc.start()
check_output(3)
+ ssc.stop(True, True)
class KafkaStreamTests(PySparkStreamingTestCase):
+ timeout = 20 # seconds
+ duration = 1
def setUp(self):
super(KafkaStreamTests, self).setUp()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 75f39d9e75..ea63a396da 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -31,7 +31,6 @@ import tempfile
import time
import zipfile
import random
-import itertools
import threading
import hashlib
@@ -49,6 +48,11 @@ else:
xrange = range
basestring = str
+if sys.version >= "3":
+ from io import StringIO
+else:
+ from StringIO import StringIO
+
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
@@ -196,7 +200,7 @@ class SorterTests(unittest.TestCase):
sc = SparkContext(conf=conf)
l = list(range(10240))
random.shuffle(l)
- rdd = sc.parallelize(l, 2)
+ rdd = sc.parallelize(l, 4)
self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
sc.stop()
@@ -300,6 +304,18 @@ class SerializationTestCase(unittest.TestCase):
hash(FlattenedValuesSerializer(PickleSerializer()))
+class QuietTest(object):
+ def __init__(self, sc):
+ self.log4j = sc._jvm.org.apache.log4j
+
+ def __enter__(self):
+ self.old_level = self.log4j.LogManager.getRootLogger().getLevel()
+ self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.log4j.LogManager.getRootLogger().setLevel(self.old_level)
+
+
class PySparkTestCase(unittest.TestCase):
def setUp(self):
@@ -371,15 +387,11 @@ class AddFileTests(PySparkTestCase):
# To ensure that we're actually testing addPyFile's effects, check that
# this job fails due to `userlibrary` not being on the Python path:
# disable logging in log4j temporarily
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
-
def func(x):
from userlibrary import UserClass
return UserClass().hello()
- self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
- log4j.LogManager.getRootLogger().setLevel(old_level)
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
# Add the file, so the job should now succeed:
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
@@ -496,7 +508,8 @@ class RDDTests(ReusedPySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
def test_sampling_default_seed(self):
# Test for SPARK-3995 (default seed setting)
@@ -536,9 +549,9 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual([jon, jane], theDoes.collect())
def test_large_broadcast(self):
- N = 100000
+ N = 10000
data = [[float(i) for i in range(300)] for i in range(N)]
- bdata = self.sc.broadcast(data) # 270MB
+ bdata = self.sc.broadcast(data) # 27MB
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEqual(N, m)
@@ -569,7 +582,7 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(checksum, csum)
def test_large_closure(self):
- N = 1000000
+ N = 200000
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
self.assertEqual(N, rdd.first())
@@ -604,17 +617,18 @@ class RDDTests(ReusedPySparkTestCase):
# different number of partitions
b = self.sc.parallelize(range(100, 106), 3)
self.assertRaises(ValueError, lambda: a.zip(b))
- # different number of batched items in JVM
- b = self.sc.parallelize(range(100, 104), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # different number of items in one pair
- b = self.sc.parallelize(range(100, 106), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # same total number of items, but different distributions
- a = self.sc.parallelize([2, 3], 2).flatMap(range)
- b = self.sc.parallelize([3, 2], 2).flatMap(range)
- self.assertEqual(a.count(), b.count())
- self.assertRaises(Exception, lambda: a.zip(b).count())
+ with QuietTest(self.sc):
+ # different number of batched items in JVM
+ b = self.sc.parallelize(range(100, 104), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # different number of items in one pair
+ b = self.sc.parallelize(range(100, 106), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # same total number of items, but different distributions
+ a = self.sc.parallelize([2, 3], 2).flatMap(range)
+ b = self.sc.parallelize([3, 2], 2).flatMap(range)
+ self.assertEqual(a.count(), b.count())
+ self.assertRaises(Exception, lambda: a.zip(b).count())
def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
@@ -877,7 +891,12 @@ class ProfilerTests(PySparkTestCase):
func_names = [func_name for fname, n, func_name in stat_list]
self.assertTrue("heavy_foo" in func_names)
+ old_stdout = sys.stdout
+ sys.stdout = io = StringIO()
self.sc.show_profiles()
+ self.assertTrue("heavy_foo" in io.getvalue())
+ sys.stdout = old_stdout
+
d = tempfile.gettempdir()
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
@@ -901,7 +920,7 @@ class ProfilerTests(PySparkTestCase):
def do_computation(self):
def heavy_foo(x):
- for i in range(1 << 20):
+ for i in range(1 << 18):
x = 1
rdd = self.sc.parallelize(range(100))
@@ -1417,7 +1436,7 @@ class DaemonTests(unittest.TestCase):
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
-class WorkerTests(PySparkTestCase):
+class WorkerTests(ReusedPySparkTestCase):
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
@@ -1432,7 +1451,10 @@ class WorkerTests(PySparkTestCase):
# start job in background thread
def run():
- self.sc.parallelize(range(1), 1).foreach(sleep)
+ try:
+ self.sc.parallelize(range(1), 1).foreach(sleep)
+ except Exception:
+ pass
import threading
t = threading.Thread(target=run)
t.daemon = True
@@ -1473,7 +1495,8 @@ class WorkerTests(PySparkTestCase):
def raise_exception(_):
raise Exception()
rdd = self.sc.parallelize(range(100), 1)
- self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
self.assertEqual(100, rdd.map(str).count())
def test_after_jvm_exception(self):
@@ -1484,7 +1507,8 @@ class WorkerTests(PySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
rdd = self.sc.parallelize(range(100), 1)
self.assertEqual(100, rdd.map(str).count())
@@ -1522,14 +1546,11 @@ class WorkerTests(PySparkTestCase):
rdd.count()
version = sys.version_info
sys.version_info = (2, 0, 0)
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
try:
- self.assertRaises(Py4JJavaError, lambda: rdd.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Py4JJavaError, lambda: rdd.count())
finally:
sys.version_info = version
- log4j.LogManager.getRootLogger().setLevel(old_level)
class SparkSubmitTests(unittest.TestCase):
@@ -1751,9 +1772,14 @@ class ContextTests(unittest.TestCase):
def test_progress_api(self):
with SparkContext() as sc:
sc.setJobGroup('test_progress_api', '', True)
-
rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
- t = threading.Thread(target=rdd.collect)
+
+ def run():
+ try:
+ rdd.count()
+ except Exception:
+ pass
+ t = threading.Thread(target=run)
t.daemon = True
t.start()
# wait for scheduler to start
diff --git a/python/run-tests b/python/run-tests
index ed3e819ef3..88b63b84fd 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -28,6 +28,7 @@ cd "$FWDIR/python"
FAILED=0
LOG_FILE=unit-tests.log
+START=$(date +"%s")
rm -f $LOG_FILE
@@ -35,8 +36,8 @@ rm -f $LOG_FILE
rm -rf metastore warehouse
function run_test() {
- echo "Running test: $1" | tee -a $LOG_FILE
-
+ echo -en "Running test: $1 ... " | tee -a $LOG_FILE
+ start=$(date +"%s")
SPARK_TESTING=1 time "$FWDIR"/bin/pyspark $1 > $LOG_FILE 2>&1
FAILED=$((PIPESTATUS[0]||$FAILED))
@@ -48,6 +49,9 @@ function run_test() {
echo "Had test failures; see logs."
echo -en "\033[0m" # No color
exit -1
+ else
+ now=$(date +"%s")
+ echo "ok ($(($now - $start))s)"
fi
}
@@ -161,9 +165,8 @@ if [ $(which pypy) ]; then
fi
if [[ $FAILED == 0 ]]; then
- echo -en "\033[32m" # Green
- echo "Tests passed."
- echo -en "\033[0m" # No color
+ now=$(date +"%s")
+ echo -e "\033[32mTests passed \033[0min $(($now - $START)) seconds"
fi
# TODO: in the long-run, it would be nice to use a test runner like `nose`.