diff options
author | Reynold Xin <rxin@databricks.com> | 2015-04-21 17:49:55 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-21 17:49:55 -0700 |
commit | 3134c3fe495862b7687b5aa00d3344d09cd5e08e (patch) | |
tree | ed556b21bbaad651c7893b6b2dcb53f304100785 /python/pyspark/tests.py | |
parent | e72c16e30d85cdc394d318b5551698885cfda9b8 (diff) | |
download | spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.gz spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.bz2 spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.zip |
[SPARK-6953] [PySpark] speed up python tests
This PR try to speed up some python tests:
```
tests.py 144s -> 103s -41s
mllib/classification.py 24s -> 17s -7s
mllib/regression.py 27s -> 15s -12s
mllib/tree.py 27s -> 13s -14s
mllib/tests.py 64s -> 31s -33s
streaming/tests.py 185s -> 84s -101s
```
Considering python3, the total saving will be 558s (almost 10 minutes) (core, and streaming run three times, mllib runs twice).
During testing, it will show used time for each test file:
```
Run core tests ...
Running test: pyspark/rdd.py ... ok (22s)
Running test: pyspark/context.py ... ok (16s)
Running test: pyspark/conf.py ... ok (4s)
Running test: pyspark/broadcast.py ... ok (4s)
Running test: pyspark/accumulators.py ... ok (4s)
Running test: pyspark/serializers.py ... ok (6s)
Running test: pyspark/profiler.py ... ok (5s)
Running test: pyspark/shuffle.py ... ok (1s)
Running test: pyspark/tests.py ... ok (103s) 144s
```
Author: Reynold Xin <rxin@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #5605 from rxin/python-tests-speed and squashes the following commits:
d08542d [Reynold Xin] Merge pull request #14 from mengxr/SPARK-6953
89321ee [Xiangrui Meng] fix seed in tests
3ad2387 [Reynold Xin] Merge pull request #5427 from davies/python_tests
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r-- | python/pyspark/tests.py | 96 |
1 files changed, 61 insertions, 35 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 75f39d9e75..ea63a396da 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -31,7 +31,6 @@ import tempfile import time import zipfile import random -import itertools import threading import hashlib @@ -49,6 +48,11 @@ else: xrange = range basestring = str +if sys.version >= "3": + from io import StringIO +else: + from StringIO import StringIO + from pyspark.conf import SparkConf from pyspark.context import SparkContext @@ -196,7 +200,7 @@ class SorterTests(unittest.TestCase): sc = SparkContext(conf=conf) l = list(range(10240)) random.shuffle(l) - rdd = sc.parallelize(l, 2) + rdd = sc.parallelize(l, 4) self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect()) sc.stop() @@ -300,6 +304,18 @@ class SerializationTestCase(unittest.TestCase): hash(FlattenedValuesSerializer(PickleSerializer())) +class QuietTest(object): + def __init__(self, sc): + self.log4j = sc._jvm.org.apache.log4j + + def __enter__(self): + self.old_level = self.log4j.LogManager.getRootLogger().getLevel() + self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.log4j.LogManager.getRootLogger().setLevel(self.old_level) + + class PySparkTestCase(unittest.TestCase): def setUp(self): @@ -371,15 +387,11 @@ class AddFileTests(PySparkTestCase): # To ensure that we're actually testing addPyFile's effects, check that # this job fails due to `userlibrary` not being on the Python path: # disable logging in log4j temporarily - log4j = self.sc._jvm.org.apache.log4j - old_level = log4j.LogManager.getRootLogger().getLevel() - log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) - def func(x): from userlibrary import UserClass return UserClass().hello() - self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) - log4j.LogManager.getRootLogger().setLevel(old_level) + with QuietTest(self.sc): + self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first) # Add the file, so the job should now succeed: path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py") @@ -496,7 +508,8 @@ class RDDTests(ReusedPySparkTestCase): filtered_data = data.filter(lambda x: True) self.assertEqual(1, filtered_data.count()) os.unlink(tempFile.name) - self.assertRaises(Exception, lambda: filtered_data.count()) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: filtered_data.count()) def test_sampling_default_seed(self): # Test for SPARK-3995 (default seed setting) @@ -536,9 +549,9 @@ class RDDTests(ReusedPySparkTestCase): self.assertEqual([jon, jane], theDoes.collect()) def test_large_broadcast(self): - N = 100000 + N = 10000 data = [[float(i) for i in range(300)] for i in range(N)] - bdata = self.sc.broadcast(data) # 270MB + bdata = self.sc.broadcast(data) # 27MB m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() self.assertEqual(N, m) @@ -569,7 +582,7 @@ class RDDTests(ReusedPySparkTestCase): self.assertEqual(checksum, csum) def test_large_closure(self): - N = 1000000 + N = 200000 data = [float(i) for i in xrange(N)] rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data)) self.assertEqual(N, rdd.first()) @@ -604,17 +617,18 @@ class RDDTests(ReusedPySparkTestCase): # different number of partitions b = self.sc.parallelize(range(100, 106), 3) self.assertRaises(ValueError, lambda: a.zip(b)) - # different number of batched items in JVM - b = self.sc.parallelize(range(100, 104), 2) - self.assertRaises(Exception, lambda: a.zip(b).count()) - # different number of items in one pair - b = self.sc.parallelize(range(100, 106), 2) - self.assertRaises(Exception, lambda: a.zip(b).count()) - # same total number of items, but different distributions - a = self.sc.parallelize([2, 3], 2).flatMap(range) - b = self.sc.parallelize([3, 2], 2).flatMap(range) - self.assertEqual(a.count(), b.count()) - self.assertRaises(Exception, lambda: a.zip(b).count()) + with QuietTest(self.sc): + # different number of batched items in JVM + b = self.sc.parallelize(range(100, 104), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # different number of items in one pair + b = self.sc.parallelize(range(100, 106), 2) + self.assertRaises(Exception, lambda: a.zip(b).count()) + # same total number of items, but different distributions + a = self.sc.parallelize([2, 3], 2).flatMap(range) + b = self.sc.parallelize([3, 2], 2).flatMap(range) + self.assertEqual(a.count(), b.count()) + self.assertRaises(Exception, lambda: a.zip(b).count()) def test_count_approx_distinct(self): rdd = self.sc.parallelize(range(1000)) @@ -877,7 +891,12 @@ class ProfilerTests(PySparkTestCase): func_names = [func_name for fname, n, func_name in stat_list] self.assertTrue("heavy_foo" in func_names) + old_stdout = sys.stdout + sys.stdout = io = StringIO() self.sc.show_profiles() + self.assertTrue("heavy_foo" in io.getvalue()) + sys.stdout = old_stdout + d = tempfile.gettempdir() self.sc.dump_profiles(d) self.assertTrue("rdd_%d.pstats" % id in os.listdir(d)) @@ -901,7 +920,7 @@ class ProfilerTests(PySparkTestCase): def do_computation(self): def heavy_foo(x): - for i in range(1 << 20): + for i in range(1 << 18): x = 1 rdd = self.sc.parallelize(range(100)) @@ -1417,7 +1436,7 @@ class DaemonTests(unittest.TestCase): self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM)) -class WorkerTests(PySparkTestCase): +class WorkerTests(ReusedPySparkTestCase): def test_cancel_task(self): temp = tempfile.NamedTemporaryFile(delete=True) temp.close() @@ -1432,7 +1451,10 @@ class WorkerTests(PySparkTestCase): # start job in background thread def run(): - self.sc.parallelize(range(1), 1).foreach(sleep) + try: + self.sc.parallelize(range(1), 1).foreach(sleep) + except Exception: + pass import threading t = threading.Thread(target=run) t.daemon = True @@ -1473,7 +1495,8 @@ class WorkerTests(PySparkTestCase): def raise_exception(_): raise Exception() rdd = self.sc.parallelize(range(100), 1) - self.assertRaises(Exception, lambda: rdd.foreach(raise_exception)) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: rdd.foreach(raise_exception)) self.assertEqual(100, rdd.map(str).count()) def test_after_jvm_exception(self): @@ -1484,7 +1507,8 @@ class WorkerTests(PySparkTestCase): filtered_data = data.filter(lambda x: True) self.assertEqual(1, filtered_data.count()) os.unlink(tempFile.name) - self.assertRaises(Exception, lambda: filtered_data.count()) + with QuietTest(self.sc): + self.assertRaises(Exception, lambda: filtered_data.count()) rdd = self.sc.parallelize(range(100), 1) self.assertEqual(100, rdd.map(str).count()) @@ -1522,14 +1546,11 @@ class WorkerTests(PySparkTestCase): rdd.count() version = sys.version_info sys.version_info = (2, 0, 0) - log4j = self.sc._jvm.org.apache.log4j - old_level = log4j.LogManager.getRootLogger().getLevel() - log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) try: - self.assertRaises(Py4JJavaError, lambda: rdd.count()) + with QuietTest(self.sc): + self.assertRaises(Py4JJavaError, lambda: rdd.count()) finally: sys.version_info = version - log4j.LogManager.getRootLogger().setLevel(old_level) class SparkSubmitTests(unittest.TestCase): @@ -1751,9 +1772,14 @@ class ContextTests(unittest.TestCase): def test_progress_api(self): with SparkContext() as sc: sc.setJobGroup('test_progress_api', '', True) - rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100)) - t = threading.Thread(target=rdd.collect) + + def run(): + try: + rdd.count() + except Exception: + pass + t = threading.Thread(target=run) t.daemon = True t.start() # wait for scheduler to start |