aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-04-21 17:49:55 -0700
committerReynold Xin <rxin@databricks.com>2015-04-21 17:49:55 -0700
commit3134c3fe495862b7687b5aa00d3344d09cd5e08e (patch)
treeed556b21bbaad651c7893b6b2dcb53f304100785 /python/pyspark/tests.py
parente72c16e30d85cdc394d318b5551698885cfda9b8 (diff)
downloadspark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.gz
spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.tar.bz2
spark-3134c3fe495862b7687b5aa00d3344d09cd5e08e.zip
[SPARK-6953] [PySpark] speed up python tests
This PR try to speed up some python tests: ``` tests.py 144s -> 103s -41s mllib/classification.py 24s -> 17s -7s mllib/regression.py 27s -> 15s -12s mllib/tree.py 27s -> 13s -14s mllib/tests.py 64s -> 31s -33s streaming/tests.py 185s -> 84s -101s ``` Considering python3, the total saving will be 558s (almost 10 minutes) (core, and streaming run three times, mllib runs twice). During testing, it will show used time for each test file: ``` Run core tests ... Running test: pyspark/rdd.py ... ok (22s) Running test: pyspark/context.py ... ok (16s) Running test: pyspark/conf.py ... ok (4s) Running test: pyspark/broadcast.py ... ok (4s) Running test: pyspark/accumulators.py ... ok (4s) Running test: pyspark/serializers.py ... ok (6s) Running test: pyspark/profiler.py ... ok (5s) Running test: pyspark/shuffle.py ... ok (1s) Running test: pyspark/tests.py ... ok (103s) 144s ``` Author: Reynold Xin <rxin@databricks.com> Author: Xiangrui Meng <meng@databricks.com> Closes #5605 from rxin/python-tests-speed and squashes the following commits: d08542d [Reynold Xin] Merge pull request #14 from mengxr/SPARK-6953 89321ee [Xiangrui Meng] fix seed in tests 3ad2387 [Reynold Xin] Merge pull request #5427 from davies/python_tests
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py96
1 files changed, 61 insertions, 35 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 75f39d9e75..ea63a396da 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -31,7 +31,6 @@ import tempfile
import time
import zipfile
import random
-import itertools
import threading
import hashlib
@@ -49,6 +48,11 @@ else:
xrange = range
basestring = str
+if sys.version >= "3":
+ from io import StringIO
+else:
+ from StringIO import StringIO
+
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
@@ -196,7 +200,7 @@ class SorterTests(unittest.TestCase):
sc = SparkContext(conf=conf)
l = list(range(10240))
random.shuffle(l)
- rdd = sc.parallelize(l, 2)
+ rdd = sc.parallelize(l, 4)
self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
sc.stop()
@@ -300,6 +304,18 @@ class SerializationTestCase(unittest.TestCase):
hash(FlattenedValuesSerializer(PickleSerializer()))
+class QuietTest(object):
+ def __init__(self, sc):
+ self.log4j = sc._jvm.org.apache.log4j
+
+ def __enter__(self):
+ self.old_level = self.log4j.LogManager.getRootLogger().getLevel()
+ self.log4j.LogManager.getRootLogger().setLevel(self.log4j.Level.FATAL)
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.log4j.LogManager.getRootLogger().setLevel(self.old_level)
+
+
class PySparkTestCase(unittest.TestCase):
def setUp(self):
@@ -371,15 +387,11 @@ class AddFileTests(PySparkTestCase):
# To ensure that we're actually testing addPyFile's effects, check that
# this job fails due to `userlibrary` not being on the Python path:
# disable logging in log4j temporarily
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
-
def func(x):
from userlibrary import UserClass
return UserClass().hello()
- self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
- log4j.LogManager.getRootLogger().setLevel(old_level)
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
# Add the file, so the job should now succeed:
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
@@ -496,7 +508,8 @@ class RDDTests(ReusedPySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
def test_sampling_default_seed(self):
# Test for SPARK-3995 (default seed setting)
@@ -536,9 +549,9 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual([jon, jane], theDoes.collect())
def test_large_broadcast(self):
- N = 100000
+ N = 10000
data = [[float(i) for i in range(300)] for i in range(N)]
- bdata = self.sc.broadcast(data) # 270MB
+ bdata = self.sc.broadcast(data) # 27MB
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
self.assertEqual(N, m)
@@ -569,7 +582,7 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(checksum, csum)
def test_large_closure(self):
- N = 1000000
+ N = 200000
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
self.assertEqual(N, rdd.first())
@@ -604,17 +617,18 @@ class RDDTests(ReusedPySparkTestCase):
# different number of partitions
b = self.sc.parallelize(range(100, 106), 3)
self.assertRaises(ValueError, lambda: a.zip(b))
- # different number of batched items in JVM
- b = self.sc.parallelize(range(100, 104), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # different number of items in one pair
- b = self.sc.parallelize(range(100, 106), 2)
- self.assertRaises(Exception, lambda: a.zip(b).count())
- # same total number of items, but different distributions
- a = self.sc.parallelize([2, 3], 2).flatMap(range)
- b = self.sc.parallelize([3, 2], 2).flatMap(range)
- self.assertEqual(a.count(), b.count())
- self.assertRaises(Exception, lambda: a.zip(b).count())
+ with QuietTest(self.sc):
+ # different number of batched items in JVM
+ b = self.sc.parallelize(range(100, 104), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # different number of items in one pair
+ b = self.sc.parallelize(range(100, 106), 2)
+ self.assertRaises(Exception, lambda: a.zip(b).count())
+ # same total number of items, but different distributions
+ a = self.sc.parallelize([2, 3], 2).flatMap(range)
+ b = self.sc.parallelize([3, 2], 2).flatMap(range)
+ self.assertEqual(a.count(), b.count())
+ self.assertRaises(Exception, lambda: a.zip(b).count())
def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
@@ -877,7 +891,12 @@ class ProfilerTests(PySparkTestCase):
func_names = [func_name for fname, n, func_name in stat_list]
self.assertTrue("heavy_foo" in func_names)
+ old_stdout = sys.stdout
+ sys.stdout = io = StringIO()
self.sc.show_profiles()
+ self.assertTrue("heavy_foo" in io.getvalue())
+ sys.stdout = old_stdout
+
d = tempfile.gettempdir()
self.sc.dump_profiles(d)
self.assertTrue("rdd_%d.pstats" % id in os.listdir(d))
@@ -901,7 +920,7 @@ class ProfilerTests(PySparkTestCase):
def do_computation(self):
def heavy_foo(x):
- for i in range(1 << 20):
+ for i in range(1 << 18):
x = 1
rdd = self.sc.parallelize(range(100))
@@ -1417,7 +1436,7 @@ class DaemonTests(unittest.TestCase):
self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
-class WorkerTests(PySparkTestCase):
+class WorkerTests(ReusedPySparkTestCase):
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
@@ -1432,7 +1451,10 @@ class WorkerTests(PySparkTestCase):
# start job in background thread
def run():
- self.sc.parallelize(range(1), 1).foreach(sleep)
+ try:
+ self.sc.parallelize(range(1), 1).foreach(sleep)
+ except Exception:
+ pass
import threading
t = threading.Thread(target=run)
t.daemon = True
@@ -1473,7 +1495,8 @@ class WorkerTests(PySparkTestCase):
def raise_exception(_):
raise Exception()
rdd = self.sc.parallelize(range(100), 1)
- self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: rdd.foreach(raise_exception))
self.assertEqual(100, rdd.map(str).count())
def test_after_jvm_exception(self):
@@ -1484,7 +1507,8 @@ class WorkerTests(PySparkTestCase):
filtered_data = data.filter(lambda x: True)
self.assertEqual(1, filtered_data.count())
os.unlink(tempFile.name)
- self.assertRaises(Exception, lambda: filtered_data.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Exception, lambda: filtered_data.count())
rdd = self.sc.parallelize(range(100), 1)
self.assertEqual(100, rdd.map(str).count())
@@ -1522,14 +1546,11 @@ class WorkerTests(PySparkTestCase):
rdd.count()
version = sys.version_info
sys.version_info = (2, 0, 0)
- log4j = self.sc._jvm.org.apache.log4j
- old_level = log4j.LogManager.getRootLogger().getLevel()
- log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL)
try:
- self.assertRaises(Py4JJavaError, lambda: rdd.count())
+ with QuietTest(self.sc):
+ self.assertRaises(Py4JJavaError, lambda: rdd.count())
finally:
sys.version_info = version
- log4j.LogManager.getRootLogger().setLevel(old_level)
class SparkSubmitTests(unittest.TestCase):
@@ -1751,9 +1772,14 @@ class ContextTests(unittest.TestCase):
def test_progress_api(self):
with SparkContext() as sc:
sc.setJobGroup('test_progress_api', '', True)
-
rdd = sc.parallelize(range(10)).map(lambda x: time.sleep(100))
- t = threading.Thread(target=rdd.collect)
+
+ def run():
+ try:
+ rdd.count()
+ except Exception:
+ pass
+ t = threading.Thread(target=run)
t.daemon = True
t.start()
# wait for scheduler to start