aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/tests.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py327
1 files changed, 175 insertions, 152 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index ee67e80d53..75f39d9e75 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,8 +19,8 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
+
from array import array
-from fileinput import input
from glob import glob
import os
import re
@@ -45,6 +45,9 @@ if sys.version_info[:2] <= (2, 6):
sys.exit(1)
else:
import unittest
+ if sys.version_info[0] >= 3:
+ xrange = range
+ basestring = str
from pyspark.conf import SparkConf
@@ -52,7 +55,9 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \
- CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer
+ CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer, \
+ PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, AutoSerializer, \
+ FlattenedValuesSerializer
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
from pyspark import shuffle
from pyspark.profiler import BasicProfiler
@@ -81,7 +86,7 @@ class MergerTests(unittest.TestCase):
def setUp(self):
self.N = 1 << 12
self.l = [i for i in xrange(self.N)]
- self.data = zip(self.l, self.l)
+ self.data = list(zip(self.l, self.l))
self.agg = Aggregator(lambda x: [x],
lambda x, y: x.append(y) or x,
lambda x, y: x.extend(y) or x)
@@ -89,45 +94,45 @@ class MergerTests(unittest.TestCase):
def test_in_memory(self):
m = InMemoryMerger(self.agg)
m.mergeValues(self.data)
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)))
m = InMemoryMerger(self.agg)
- m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data))
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)))
def test_small_dataset(self):
m = ExternalMerger(self.agg, 1000)
m.mergeValues(self.data)
self.assertEqual(m.spills, 0)
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)))
m = ExternalMerger(self.agg, 1000)
- m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data))
+ m.mergeCombiners(map(lambda x_y1: (x_y1[0], [x_y1[1]]), self.data))
self.assertEqual(m.spills, 0)
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)))
def test_medium_dataset(self):
- m = ExternalMerger(self.agg, 30)
+ m = ExternalMerger(self.agg, 20)
m.mergeValues(self.data)
self.assertTrue(m.spills >= 1)
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)))
m = ExternalMerger(self.agg, 10)
- m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3))
+ m.mergeCombiners(map(lambda x_y2: (x_y2[0], [x_y2[1]]), self.data * 3))
self.assertTrue(m.spills >= 1)
- self.assertEqual(sum(sum(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(sum(v) for k, v in m.items()),
sum(xrange(self.N)) * 3)
def test_huge_dataset(self):
- m = ExternalMerger(self.agg, 10, partitions=3)
- m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10))
+ m = ExternalMerger(self.agg, 5, partitions=3)
+ m.mergeCombiners(map(lambda k_v: (k_v[0], [str(k_v[1])]), self.data * 10))
self.assertTrue(m.spills >= 1)
- self.assertEqual(sum(len(v) for k, v in m.iteritems()),
+ self.assertEqual(sum(len(v) for k, v in m.items()),
self.N * 10)
m._cleanup()
@@ -144,55 +149,55 @@ class MergerTests(unittest.TestCase):
self.assertEqual(1, len(list(gen_gs(1))))
self.assertEqual(2, len(list(gen_gs(2))))
self.assertEqual(100, len(list(gen_gs(100))))
- self.assertEqual(range(1, 101), [k for k, _ in gen_gs(100)])
- self.assertTrue(all(range(k) == list(vs) for k, vs in gen_gs(100)))
+ self.assertEqual(list(range(1, 101)), [k for k, _ in gen_gs(100)])
+ self.assertTrue(all(list(range(k)) == list(vs) for k, vs in gen_gs(100)))
for k, vs in gen_gs(50002, 10000):
self.assertEqual(k, len(vs))
- self.assertEqual(range(k), list(vs))
+ self.assertEqual(list(range(k)), list(vs))
ser = PickleSerializer()
l = ser.loads(ser.dumps(list(gen_gs(50002, 30000))))
for k, vs in l:
self.assertEqual(k, len(vs))
- self.assertEqual(range(k), list(vs))
+ self.assertEqual(list(range(k)), list(vs))
class SorterTests(unittest.TestCase):
def test_in_memory_sort(self):
- l = range(1024)
+ l = list(range(1024))
random.shuffle(l)
sorter = ExternalSorter(1024)
- self.assertEquals(sorted(l), list(sorter.sorted(l)))
- self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
- self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
- self.assertEquals(sorted(l, key=lambda x: -x, reverse=True),
- list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
+ self.assertEqual(sorted(l), list(sorter.sorted(l)))
+ self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
+ self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertEqual(sorted(l, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
def test_external_sort(self):
- l = range(1024)
+ l = list(range(1024))
random.shuffle(l)
sorter = ExternalSorter(1)
- self.assertEquals(sorted(l), list(sorter.sorted(l)))
+ self.assertEqual(sorted(l), list(sorter.sorted(l)))
self.assertGreater(shuffle.DiskBytesSpilled, 0)
last = shuffle.DiskBytesSpilled
- self.assertEquals(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
+ self.assertEqual(sorted(l, reverse=True), list(sorter.sorted(l, reverse=True)))
self.assertGreater(shuffle.DiskBytesSpilled, last)
last = shuffle.DiskBytesSpilled
- self.assertEquals(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
+ self.assertEqual(sorted(l, key=lambda x: -x), list(sorter.sorted(l, key=lambda x: -x)))
self.assertGreater(shuffle.DiskBytesSpilled, last)
last = shuffle.DiskBytesSpilled
- self.assertEquals(sorted(l, key=lambda x: -x, reverse=True),
- list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
+ self.assertEqual(sorted(l, key=lambda x: -x, reverse=True),
+ list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
self.assertGreater(shuffle.DiskBytesSpilled, last)
def test_external_sort_in_rdd(self):
conf = SparkConf().set("spark.python.worker.memory", "1m")
sc = SparkContext(conf=conf)
- l = range(10240)
+ l = list(range(10240))
random.shuffle(l)
- rdd = sc.parallelize(l, 10)
- self.assertEquals(sorted(l), rdd.sortBy(lambda x: x).collect())
+ rdd = sc.parallelize(l, 2)
+ self.assertEqual(sorted(l), rdd.sortBy(lambda x: x).collect())
sc.stop()
@@ -200,11 +205,11 @@ class SerializationTestCase(unittest.TestCase):
def test_namedtuple(self):
from collections import namedtuple
- from cPickle import dumps, loads
+ from pickle import dumps, loads
P = namedtuple("P", "x y")
p1 = P(1, 3)
p2 = loads(dumps(p1, 2))
- self.assertEquals(p1, p2)
+ self.assertEqual(p1, p2)
def test_itemgetter(self):
from operator import itemgetter
@@ -246,7 +251,7 @@ class SerializationTestCase(unittest.TestCase):
ser = CloudPickleSerializer()
out1 = sys.stderr
out2 = ser.loads(ser.dumps(out1))
- self.assertEquals(out1, out2)
+ self.assertEqual(out1, out2)
def test_func_globals(self):
@@ -263,19 +268,36 @@ class SerializationTestCase(unittest.TestCase):
def foo():
sys.exit(0)
- self.assertTrue("exit" in foo.func_code.co_names)
+ self.assertTrue("exit" in foo.__code__.co_names)
ser.dumps(foo)
def test_compressed_serializer(self):
ser = CompressedSerializer(PickleSerializer())
- from StringIO import StringIO
+ try:
+ from StringIO import StringIO
+ except ImportError:
+ from io import BytesIO as StringIO
io = StringIO()
ser.dump_stream(["abc", u"123", range(5)], io)
io.seek(0)
self.assertEqual(["abc", u"123", range(5)], list(ser.load_stream(io)))
ser.dump_stream(range(1000), io)
io.seek(0)
- self.assertEqual(["abc", u"123", range(5)] + range(1000), list(ser.load_stream(io)))
+ self.assertEqual(["abc", u"123", range(5)] + list(range(1000)), list(ser.load_stream(io)))
+ io.close()
+
+ def test_hash_serializer(self):
+ hash(NoOpSerializer())
+ hash(UTF8Deserializer())
+ hash(PickleSerializer())
+ hash(MarshalSerializer())
+ hash(AutoSerializer())
+ hash(BatchedSerializer(PickleSerializer()))
+ hash(AutoBatchedSerializer(MarshalSerializer()))
+ hash(PairDeserializer(NoOpSerializer(), UTF8Deserializer()))
+ hash(CartesianDeserializer(NoOpSerializer(), UTF8Deserializer()))
+ hash(CompressedSerializer(PickleSerializer()))
+ hash(FlattenedValuesSerializer(PickleSerializer()))
class PySparkTestCase(unittest.TestCase):
@@ -340,7 +362,7 @@ class CheckpointTests(ReusedPySparkTestCase):
self.assertTrue(flatMappedRDD.getCheckpointFile() is not None)
recovered = self.sc._checkpointFile(flatMappedRDD.getCheckpointFile(),
flatMappedRDD._jrdd_deserializer)
- self.assertEquals([1, 2, 3, 4], recovered.collect())
+ self.assertEqual([1, 2, 3, 4], recovered.collect())
class AddFileTests(PySparkTestCase):
@@ -356,8 +378,7 @@ class AddFileTests(PySparkTestCase):
def func(x):
from userlibrary import UserClass
return UserClass().hello()
- self.assertRaises(Exception,
- self.sc.parallelize(range(2)).map(func).first)
+ self.assertRaises(Exception, self.sc.parallelize(range(2)).map(func).first)
log4j.LogManager.getRootLogger().setLevel(old_level)
# Add the file, so the job should now succeed:
@@ -372,7 +393,7 @@ class AddFileTests(PySparkTestCase):
download_path = SparkFiles.get("hello.txt")
self.assertNotEqual(path, download_path)
with open(download_path) as test_file:
- self.assertEquals("Hello World!\n", test_file.readline())
+ self.assertEqual("Hello World!\n", test_file.readline())
def test_add_py_file_locally(self):
# To ensure that we're actually testing addPyFile's effects, check that
@@ -381,7 +402,7 @@ class AddFileTests(PySparkTestCase):
from userlibrary import UserClass
self.assertRaises(ImportError, func)
path = os.path.join(SPARK_HOME, "python/test_support/userlibrary.py")
- self.sc.addFile(path)
+ self.sc.addPyFile(path)
from userlibrary import UserClass
self.assertEqual("Hello World!", UserClass().hello())
@@ -391,7 +412,7 @@ class AddFileTests(PySparkTestCase):
def func():
from userlib import UserClass
self.assertRaises(ImportError, func)
- path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg")
+ path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1.zip")
self.sc.addPyFile(path)
from userlib import UserClass
self.assertEqual("Hello World from inside a package!", UserClass().hello())
@@ -427,8 +448,9 @@ class RDDTests(ReusedPySparkTestCase):
tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
- raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
- self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
+ raw_contents = b''.join(open(p, 'rb').read()
+ for p in glob(tempFile.name + "/part-0000*"))
+ self.assertEqual(x, raw_contents.strip().decode("utf-8"))
def test_save_as_textfile_with_utf8(self):
x = u"\u00A1Hola, mundo!"
@@ -436,19 +458,20 @@ class RDDTests(ReusedPySparkTestCase):
tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsTextFile(tempFile.name)
- raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
- self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
+ raw_contents = b''.join(open(p, 'rb').read()
+ for p in glob(tempFile.name + "/part-0000*"))
+ self.assertEqual(x, raw_contents.strip().decode('utf8'))
def test_transforming_cartesian_result(self):
# Regression test for SPARK-1034
rdd1 = self.sc.parallelize([1, 2])
rdd2 = self.sc.parallelize([3, 4])
cart = rdd1.cartesian(rdd2)
- result = cart.map(lambda (x, y): x + y).collect()
+ result = cart.map(lambda x_y3: x_y3[0] + x_y3[1]).collect()
def test_transforming_pickle_file(self):
# Regression test for SPARK-2601
- data = self.sc.parallelize(["Hello", "World!"])
+ data = self.sc.parallelize([u"Hello", u"World!"])
tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
data.saveAsPickleFile(tempFile.name)
@@ -461,13 +484,13 @@ class RDDTests(ReusedPySparkTestCase):
a = self.sc.textFile(path)
result = a.cartesian(a).collect()
(x, y) = result[0]
- self.assertEqual("Hello World!", x.strip())
- self.assertEqual("Hello World!", y.strip())
+ self.assertEqual(u"Hello World!", x.strip())
+ self.assertEqual(u"Hello World!", y.strip())
def test_deleting_input_files(self):
# Regression test for SPARK-1025
tempFile = tempfile.NamedTemporaryFile(delete=False)
- tempFile.write("Hello World!")
+ tempFile.write(b"Hello World!")
tempFile.close()
data = self.sc.textFile(tempFile.name)
filtered_data = data.filter(lambda x: True)
@@ -510,21 +533,21 @@ class RDDTests(ReusedPySparkTestCase):
jon = Person(1, "Jon", "Doe")
jane = Person(2, "Jane", "Doe")
theDoes = self.sc.parallelize([jon, jane])
- self.assertEquals([jon, jane], theDoes.collect())
+ self.assertEqual([jon, jane], theDoes.collect())
def test_large_broadcast(self):
N = 100000
data = [[float(i) for i in range(300)] for i in range(N)]
bdata = self.sc.broadcast(data) # 270MB
m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum()
- self.assertEquals(N, m)
+ self.assertEqual(N, m)
def test_multiple_broadcasts(self):
N = 1 << 21
b1 = self.sc.broadcast(set(range(N))) # multiple blocks in JVM
- r = range(1 << 15)
+ r = list(range(1 << 15))
random.shuffle(r)
- s = str(r)
+ s = str(r).encode()
checksum = hashlib.md5(s).hexdigest()
b2 = self.sc.broadcast(s)
r = list(set(self.sc.parallelize(range(10), 10).map(
@@ -535,7 +558,7 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(checksum, csum)
random.shuffle(r)
- s = str(r)
+ s = str(r).encode()
checksum = hashlib.md5(s).hexdigest()
b2 = self.sc.broadcast(s)
r = list(set(self.sc.parallelize(range(10), 10).map(
@@ -549,7 +572,7 @@ class RDDTests(ReusedPySparkTestCase):
N = 1000000
data = [float(i) for i in xrange(N)]
rdd = self.sc.parallelize(range(1), 1).map(lambda x: len(data))
- self.assertEquals(N, rdd.first())
+ self.assertEqual(N, rdd.first())
# regression test for SPARK-6886
self.assertEqual(1, rdd.map(lambda x: (x, 1)).groupByKey().count())
@@ -590,15 +613,15 @@ class RDDTests(ReusedPySparkTestCase):
# same total number of items, but different distributions
a = self.sc.parallelize([2, 3], 2).flatMap(range)
b = self.sc.parallelize([3, 2], 2).flatMap(range)
- self.assertEquals(a.count(), b.count())
+ self.assertEqual(a.count(), b.count())
self.assertRaises(Exception, lambda: a.zip(b).count())
def test_count_approx_distinct(self):
rdd = self.sc.parallelize(range(1000))
- self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050)
- self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050)
- self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050)
- self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050)
+ self.assertTrue(950 < rdd.countApproxDistinct(0.03) < 1050)
+ self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.03) < 1050)
+ self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.03) < 1050)
+ self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.03) < 1050)
rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7)
self.assertTrue(18 < rdd.countApproxDistinct() < 22)
@@ -612,59 +635,59 @@ class RDDTests(ReusedPySparkTestCase):
def test_histogram(self):
# empty
rdd = self.sc.parallelize([])
- self.assertEquals([0], rdd.histogram([0, 10])[1])
- self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+ self.assertEqual([0], rdd.histogram([0, 10])[1])
+ self.assertEqual([0, 0], rdd.histogram([0, 4, 10])[1])
self.assertRaises(ValueError, lambda: rdd.histogram(1))
# out of range
rdd = self.sc.parallelize([10.01, -0.01])
- self.assertEquals([0], rdd.histogram([0, 10])[1])
- self.assertEquals([0, 0], rdd.histogram((0, 4, 10))[1])
+ self.assertEqual([0], rdd.histogram([0, 10])[1])
+ self.assertEqual([0, 0], rdd.histogram((0, 4, 10))[1])
# in range with one bucket
rdd = self.sc.parallelize(range(1, 5))
- self.assertEquals([4], rdd.histogram([0, 10])[1])
- self.assertEquals([3, 1], rdd.histogram([0, 4, 10])[1])
+ self.assertEqual([4], rdd.histogram([0, 10])[1])
+ self.assertEqual([3, 1], rdd.histogram([0, 4, 10])[1])
# in range with one bucket exact match
- self.assertEquals([4], rdd.histogram([1, 4])[1])
+ self.assertEqual([4], rdd.histogram([1, 4])[1])
# out of range with two buckets
rdd = self.sc.parallelize([10.01, -0.01])
- self.assertEquals([0, 0], rdd.histogram([0, 5, 10])[1])
+ self.assertEqual([0, 0], rdd.histogram([0, 5, 10])[1])
# out of range with two uneven buckets
rdd = self.sc.parallelize([10.01, -0.01])
- self.assertEquals([0, 0], rdd.histogram([0, 4, 10])[1])
+ self.assertEqual([0, 0], rdd.histogram([0, 4, 10])[1])
# in range with two buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
- self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+ self.assertEqual([3, 2], rdd.histogram([0, 5, 10])[1])
# in range with two bucket and None
rdd = self.sc.parallelize([1, 2, 3, 5, 6, None, float('nan')])
- self.assertEquals([3, 2], rdd.histogram([0, 5, 10])[1])
+ self.assertEqual([3, 2], rdd.histogram([0, 5, 10])[1])
# in range with two uneven buckets
rdd = self.sc.parallelize([1, 2, 3, 5, 6])
- self.assertEquals([3, 2], rdd.histogram([0, 5, 11])[1])
+ self.assertEqual([3, 2], rdd.histogram([0, 5, 11])[1])
# mixed range with two uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01])
- self.assertEquals([4, 3], rdd.histogram([0, 5, 11])[1])
+ self.assertEqual([4, 3], rdd.histogram([0, 5, 11])[1])
# mixed range with four uneven buckets
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0, 200.0, 200.1])
- self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+ self.assertEqual([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
# mixed range with uneven buckets and NaN
rdd = self.sc.parallelize([-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0,
199.0, 200.0, 200.1, None, float('nan')])
- self.assertEquals([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
+ self.assertEqual([4, 2, 1, 3], rdd.histogram([0.0, 5.0, 11.0, 12.0, 200.0])[1])
# out of range with infinite buckets
rdd = self.sc.parallelize([10.01, -0.01, float('nan'), float("inf")])
- self.assertEquals([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])
+ self.assertEqual([1, 2], rdd.histogram([float('-inf'), 0, float('inf')])[1])
# invalid buckets
self.assertRaises(ValueError, lambda: rdd.histogram([]))
@@ -674,25 +697,25 @@ class RDDTests(ReusedPySparkTestCase):
# without buckets
rdd = self.sc.parallelize(range(1, 5))
- self.assertEquals(([1, 4], [4]), rdd.histogram(1))
+ self.assertEqual(([1, 4], [4]), rdd.histogram(1))
# without buckets single element
rdd = self.sc.parallelize([1])
- self.assertEquals(([1, 1], [1]), rdd.histogram(1))
+ self.assertEqual(([1, 1], [1]), rdd.histogram(1))
# without bucket no range
rdd = self.sc.parallelize([1] * 4)
- self.assertEquals(([1, 1], [4]), rdd.histogram(1))
+ self.assertEqual(([1, 1], [4]), rdd.histogram(1))
# without buckets basic two
rdd = self.sc.parallelize(range(1, 5))
- self.assertEquals(([1, 2.5, 4], [2, 2]), rdd.histogram(2))
+ self.assertEqual(([1, 2.5, 4], [2, 2]), rdd.histogram(2))
# without buckets with more requested than elements
rdd = self.sc.parallelize([1, 2])
buckets = [1 + 0.2 * i for i in range(6)]
hist = [1, 0, 0, 0, 1]
- self.assertEquals((buckets, hist), rdd.histogram(5))
+ self.assertEqual((buckets, hist), rdd.histogram(5))
# invalid RDDs
rdd = self.sc.parallelize([1, float('inf')])
@@ -702,15 +725,8 @@ class RDDTests(ReusedPySparkTestCase):
# string
rdd = self.sc.parallelize(["ab", "ac", "b", "bd", "ef"], 2)
- self.assertEquals([2, 2], rdd.histogram(["a", "b", "c"])[1])
- self.assertEquals((["ab", "ef"], [5]), rdd.histogram(1))
- self.assertRaises(TypeError, lambda: rdd.histogram(2))
-
- # mixed RDD
- rdd = self.sc.parallelize([1, 4, "ab", "ac", "b"], 2)
- self.assertEquals([1, 1], rdd.histogram([0, 4, 10])[1])
- self.assertEquals([2, 1], rdd.histogram(["a", "b", "c"])[1])
- self.assertEquals(([1, "b"], [5]), rdd.histogram(1))
+ self.assertEqual([2, 2], rdd.histogram(["a", "b", "c"])[1])
+ self.assertEqual((["ab", "ef"], [5]), rdd.histogram(1))
self.assertRaises(TypeError, lambda: rdd.histogram(2))
def test_repartitionAndSortWithinPartitions(self):
@@ -718,31 +734,31 @@ class RDDTests(ReusedPySparkTestCase):
repartitioned = rdd.repartitionAndSortWithinPartitions(2, lambda key: key % 2)
partitions = repartitioned.glom().collect()
- self.assertEquals(partitions[0], [(0, 5), (0, 8), (2, 6)])
- self.assertEquals(partitions[1], [(1, 3), (3, 8), (3, 8)])
+ self.assertEqual(partitions[0], [(0, 5), (0, 8), (2, 6)])
+ self.assertEqual(partitions[1], [(1, 3), (3, 8), (3, 8)])
def test_distinct(self):
rdd = self.sc.parallelize((1, 2, 3)*10, 10)
- self.assertEquals(rdd.getNumPartitions(), 10)
- self.assertEquals(rdd.distinct().count(), 3)
+ self.assertEqual(rdd.getNumPartitions(), 10)
+ self.assertEqual(rdd.distinct().count(), 3)
result = rdd.distinct(5)
- self.assertEquals(result.getNumPartitions(), 5)
- self.assertEquals(result.count(), 3)
+ self.assertEqual(result.getNumPartitions(), 5)
+ self.assertEqual(result.count(), 3)
def test_external_group_by_key(self):
- self.sc._conf.set("spark.python.worker.memory", "5m")
+ self.sc._conf.set("spark.python.worker.memory", "1m")
N = 200001
kv = self.sc.parallelize(range(N)).map(lambda x: (x % 3, x))
gkv = kv.groupByKey().cache()
self.assertEqual(3, gkv.count())
- filtered = gkv.filter(lambda (k, vs): k == 1)
+ filtered = gkv.filter(lambda kv: kv[0] == 1)
self.assertEqual(1, filtered.count())
- self.assertEqual([(1, N/3)], filtered.mapValues(len).collect())
- self.assertEqual([(N/3, N/3)],
+ self.assertEqual([(1, N // 3)], filtered.mapValues(len).collect())
+ self.assertEqual([(N // 3, N // 3)],
filtered.values().map(lambda x: (len(x), len(list(x)))).collect())
result = filtered.collect()[0][1]
- self.assertEqual(N/3, len(result))
- self.assertTrue(isinstance(result.data, shuffle.ExternalList))
+ self.assertEqual(N // 3, len(result))
+ self.assertTrue(isinstance(result.data, shuffle.ExternalListOfList))
def test_sort_on_empty_rdd(self):
self.assertEqual([], self.sc.parallelize(zip([], [])).sortByKey().collect())
@@ -767,7 +783,7 @@ class RDDTests(ReusedPySparkTestCase):
rdd = RDD(jrdd, self.sc, UTF8Deserializer())
self.assertEqual([u"a", None, u"b"], rdd.collect())
rdd = RDD(jrdd, self.sc, NoOpSerializer())
- self.assertEqual(["a", None, "b"], rdd.collect())
+ self.assertEqual([b"a", None, b"b"], rdd.collect())
def test_multiple_python_java_RDD_conversions(self):
# Regression test for SPARK-5361
@@ -813,14 +829,14 @@ class RDDTests(ReusedPySparkTestCase):
self.sc.setJobGroup("test3", "test", True)
d = sorted(parted.cogroup(parted).collect())
self.assertEqual(10, len(d))
- self.assertEqual([[0], [0]], map(list, d[0][1]))
+ self.assertEqual([[0], [0]], list(map(list, d[0][1])))
jobId = tracker.getJobIdsForGroup("test3")[0]
self.assertEqual(2, len(tracker.getJobInfo(jobId).stageIds))
self.sc.setJobGroup("test4", "test", True)
d = sorted(parted.cogroup(rdd).collect())
self.assertEqual(10, len(d))
- self.assertEqual([[0], [0]], map(list, d[0][1]))
+ self.assertEqual([[0], [0]], list(map(list, d[0][1])))
jobId = tracker.getJobIdsForGroup("test4")[0]
self.assertEqual(3, len(tracker.getJobInfo(jobId).stageIds))
@@ -906,6 +922,7 @@ class InputFormatTests(ReusedPySparkTestCase):
ReusedPySparkTestCase.tearDownClass()
shutil.rmtree(cls.tempdir.name)
+ @unittest.skipIf(sys.version >= "3", "serialize array of byte")
def test_sequencefiles(self):
basepath = self.tempdir.name
ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
@@ -954,15 +971,16 @@ class InputFormatTests(ReusedPySparkTestCase):
en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
self.assertEqual(nulls, en)
- maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
- "org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.MapWritable").collect())
+ maps = self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
+ "org.apache.hadoop.io.IntWritable",
+ "org.apache.hadoop.io.MapWritable").collect()
em = [(1, {}),
(1, {3.0: u'bb'}),
(2, {1.0: u'aa'}),
(2, {1.0: u'cc'}),
(3, {2.0: u'dd'})]
- self.assertEqual(maps, em)
+ for v in maps:
+ self.assertTrue(v in em)
# arrays get pickled to tuples by default
tuples = sorted(self.sc.sequenceFile(
@@ -1089,8 +1107,8 @@ class InputFormatTests(ReusedPySparkTestCase):
def test_binary_files(self):
path = os.path.join(self.tempdir.name, "binaryfiles")
os.mkdir(path)
- data = "short binary data"
- with open(os.path.join(path, "part-0000"), 'w') as f:
+ data = b"short binary data"
+ with open(os.path.join(path, "part-0000"), 'wb') as f:
f.write(data)
[(p, d)] = self.sc.binaryFiles(path).collect()
self.assertTrue(p.endswith("part-0000"))
@@ -1103,7 +1121,7 @@ class InputFormatTests(ReusedPySparkTestCase):
for i in range(100):
f.write('%04d' % i)
result = self.sc.binaryRecords(path, 4).map(int).collect()
- self.assertEqual(range(100), result)
+ self.assertEqual(list(range(100)), result)
class OutputFormatTests(ReusedPySparkTestCase):
@@ -1115,6 +1133,7 @@ class OutputFormatTests(ReusedPySparkTestCase):
def tearDown(self):
shutil.rmtree(self.tempdir.name, ignore_errors=True)
+ @unittest.skipIf(sys.version >= "3", "serialize array of byte")
def test_sequencefiles(self):
basepath = self.tempdir.name
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
@@ -1155,8 +1174,9 @@ class OutputFormatTests(ReusedPySparkTestCase):
(2, {1.0: u'cc'}),
(3, {2.0: u'dd'})]
self.sc.parallelize(em).saveAsSequenceFile(basepath + "/sfmap/")
- maps = sorted(self.sc.sequenceFile(basepath + "/sfmap/").collect())
- self.assertEqual(maps, em)
+ maps = self.sc.sequenceFile(basepath + "/sfmap/").collect()
+ for v in maps:
+ self.assertTrue(v, em)
def test_oldhadoop(self):
basepath = self.tempdir.name
@@ -1168,12 +1188,13 @@ class OutputFormatTests(ReusedPySparkTestCase):
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable")
- result = sorted(self.sc.hadoopFile(
+ result = self.sc.hadoopFile(
basepath + "/oldhadoop/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
- "org.apache.hadoop.io.MapWritable").collect())
- self.assertEqual(result, dict_data)
+ "org.apache.hadoop.io.MapWritable").collect()
+ for v in result:
+ self.assertTrue(v, dict_data)
conf = {
"mapred.output.format.class": "org.apache.hadoop.mapred.SequenceFileOutputFormat",
@@ -1183,12 +1204,13 @@ class OutputFormatTests(ReusedPySparkTestCase):
}
self.sc.parallelize(dict_data).saveAsHadoopDataset(conf)
input_conf = {"mapred.input.dir": basepath + "/olddataset/"}
- old_dataset = sorted(self.sc.hadoopRDD(
+ result = self.sc.hadoopRDD(
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable",
- conf=input_conf).collect())
- self.assertEqual(old_dataset, dict_data)
+ conf=input_conf).collect()
+ for v in result:
+ self.assertTrue(v, dict_data)
def test_newhadoop(self):
basepath = self.tempdir.name
@@ -1223,6 +1245,7 @@ class OutputFormatTests(ReusedPySparkTestCase):
conf=input_conf).collect())
self.assertEqual(new_dataset, data)
+ @unittest.skipIf(sys.version >= "3", "serialize of array")
def test_newhadoop_with_array(self):
basepath = self.tempdir.name
# use custom ArrayWritable types and converters to handle arrays
@@ -1303,7 +1326,7 @@ class OutputFormatTests(ReusedPySparkTestCase):
basepath = self.tempdir.name
x = range(1, 5)
y = range(1001, 1005)
- data = zip(x, y)
+ data = list(zip(x, y))
rdd = self.sc.parallelize(x).zip(self.sc.parallelize(y))
rdd.saveAsSequenceFile(basepath + "/reserialize/sequence")
result1 = sorted(self.sc.sequenceFile(basepath + "/reserialize/sequence").collect())
@@ -1354,7 +1377,7 @@ class DaemonTests(unittest.TestCase):
sock = socket(AF_INET, SOCK_STREAM)
sock.connect(('127.0.0.1', port))
# send a split index of -1 to shutdown the worker
- sock.send("\xFF\xFF\xFF\xFF")
+ sock.send(b"\xFF\xFF\xFF\xFF")
sock.close()
return True
@@ -1395,7 +1418,6 @@ class DaemonTests(unittest.TestCase):
class WorkerTests(PySparkTestCase):
-
def test_cancel_task(self):
temp = tempfile.NamedTemporaryFile(delete=True)
temp.close()
@@ -1410,7 +1432,7 @@ class WorkerTests(PySparkTestCase):
# start job in background thread
def run():
- self.sc.parallelize(range(1)).foreach(sleep)
+ self.sc.parallelize(range(1), 1).foreach(sleep)
import threading
t = threading.Thread(target=run)
t.daemon = True
@@ -1419,7 +1441,8 @@ class WorkerTests(PySparkTestCase):
daemon_pid, worker_pid = 0, 0
while True:
if os.path.exists(path):
- data = open(path).read().split(' ')
+ with open(path) as f:
+ data = f.read().split(' ')
daemon_pid, worker_pid = map(int, data)
break
time.sleep(0.1)
@@ -1455,7 +1478,7 @@ class WorkerTests(PySparkTestCase):
def test_after_jvm_exception(self):
tempFile = tempfile.NamedTemporaryFile(delete=False)
- tempFile.write("Hello World!")
+ tempFile.write(b"Hello World!")
tempFile.close()
data = self.sc.textFile(tempFile.name, 1)
filtered_data = data.filter(lambda x: True)
@@ -1577,12 +1600,12 @@ class SparkSubmitTests(unittest.TestCase):
|from pyspark import SparkContext
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect()
+ |print(sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect())
""")
proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 4, 6]", out)
+ self.assertIn("[2, 4, 6]", out.decode('utf-8'))
def test_script_with_local_functions(self):
"""Submit and test a single script file calling a global function"""
@@ -1593,12 +1616,12 @@ class SparkSubmitTests(unittest.TestCase):
| return x * 3
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ |print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[3, 6, 9]", out)
+ self.assertIn("[3, 6, 9]", out.decode('utf-8'))
def test_module_dependency(self):
"""Submit and test a script with a dependency on another module"""
@@ -1607,7 +1630,7 @@ class SparkSubmitTests(unittest.TestCase):
|from mylib import myfunc
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
zip = self.createFileInZip("mylib.py", """
|def myfunc(x):
@@ -1617,7 +1640,7 @@ class SparkSubmitTests(unittest.TestCase):
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 3, 4]", out)
+ self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_module_dependency_on_cluster(self):
"""Submit and test a script with a dependency on another module on a cluster"""
@@ -1626,7 +1649,7 @@ class SparkSubmitTests(unittest.TestCase):
|from mylib import myfunc
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
zip = self.createFileInZip("mylib.py", """
|def myfunc(x):
@@ -1637,7 +1660,7 @@ class SparkSubmitTests(unittest.TestCase):
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 3, 4]", out)
+ self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_package_dependency(self):
"""Submit and test a script with a dependency on a Spark Package"""
@@ -1646,14 +1669,14 @@ class SparkSubmitTests(unittest.TestCase):
|from mylib import myfunc
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
"file:" + self.programDir, script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 3, 4]", out)
+ self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_package_dependency_on_cluster(self):
"""Submit and test a script with a dependency on a Spark Package on a cluster"""
@@ -1662,7 +1685,7 @@ class SparkSubmitTests(unittest.TestCase):
|from mylib import myfunc
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(myfunc).collect()
+ |print(sc.parallelize([1, 2, 3]).map(myfunc).collect())
""")
self.create_spark_package("a:mylib:0.1")
proc = subprocess.Popen([self.sparkSubmit, "--packages", "a:mylib:0.1", "--repositories",
@@ -1670,7 +1693,7 @@ class SparkSubmitTests(unittest.TestCase):
"local-cluster[1,1,512]", script], stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 3, 4]", out)
+ self.assertIn("[2, 3, 4]", out.decode('utf-8'))
def test_single_script_on_cluster(self):
"""Submit and test a single script on a cluster"""
@@ -1681,7 +1704,7 @@ class SparkSubmitTests(unittest.TestCase):
| return x * 2
|
|sc = SparkContext()
- |print sc.parallelize([1, 2, 3]).map(foo).collect()
+ |print(sc.parallelize([1, 2, 3]).map(foo).collect())
""")
# this will fail if you have different spark.executor.memory
# in conf/spark-defaults.conf
@@ -1690,7 +1713,7 @@ class SparkSubmitTests(unittest.TestCase):
stdout=subprocess.PIPE)
out, err = proc.communicate()
self.assertEqual(0, proc.returncode)
- self.assertIn("[2, 4, 6]", out)
+ self.assertIn("[2, 4, 6]", out.decode('utf-8'))
class ContextTests(unittest.TestCase):
@@ -1765,7 +1788,7 @@ class SciPyTests(PySparkTestCase):
def test_serialize(self):
from scipy.special import gammaln
x = range(1, 5)
- expected = map(gammaln, x)
+ expected = list(map(gammaln, x))
observed = self.sc.parallelize(x).map(gammaln).collect()
self.assertEqual(expected, observed)
@@ -1786,11 +1809,11 @@ class NumPyTests(PySparkTestCase):
if __name__ == "__main__":
if not _have_scipy:
- print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+ print("NOTE: Skipping SciPy tests as it does not seem to be installed")
if not _have_numpy:
- print "NOTE: Skipping NumPy tests as it does not seem to be installed"
+ print("NOTE: Skipping NumPy tests as it does not seem to be installed")
unittest.main()
if not _have_scipy:
- print "NOTE: SciPy tests were skipped as it does not seem to be installed"
+ print("NOTE: SciPy tests were skipped as it does not seem to be installed")
if not _have_numpy:
- print "NOTE: NumPy tests were skipped as it does not seem to be installed"
+ print("NOTE: NumPy tests were skipped as it does not seem to be installed")