aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/streaming/tests.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py39
1 files changed, 20 insertions, 19 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 9b4635e490..06d2215437 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -22,6 +22,7 @@ import operator
import unittest
import tempfile
import struct
+from functools import reduce
from py4j.java_collections import MapConverter
@@ -51,7 +52,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
while len(result) < n and time.time() - start_time < self.timeout:
time.sleep(0.01)
if len(result) < n:
- print "timeout after", self.timeout
+ print("timeout after", self.timeout)
def _take(self, dstream, n):
"""
@@ -131,7 +132,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.map(str)
- expected = map(lambda x: map(str, x), input)
+ expected = [list(map(str, x)) for x in input]
self._test_func(input, func, expected)
def test_flatMap(self):
@@ -140,8 +141,8 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.flatMap(lambda x: (x, x * 2))
- expected = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
- input)
+ expected = [list(chain.from_iterable((map(lambda y: [y, y * 2], x))))
+ for x in input]
self._test_func(input, func, expected)
def test_filter(self):
@@ -150,7 +151,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.filter(lambda x: x % 2 == 0)
- expected = map(lambda x: filter(lambda y: y % 2 == 0, x), input)
+ expected = [[y for y in x if y % 2 == 0] for x in input]
self._test_func(input, func, expected)
def test_count(self):
@@ -159,7 +160,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.count()
- expected = map(lambda x: [len(x)], input)
+ expected = [[len(x)] for x in input]
self._test_func(input, func, expected)
def test_reduce(self):
@@ -168,7 +169,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.reduce(operator.add)
- expected = map(lambda x: [reduce(operator.add, x)], input)
+ expected = [[reduce(operator.add, x)] for x in input]
self._test_func(input, func, expected)
def test_reduceByKey(self):
@@ -185,27 +186,27 @@ class BasicOperationTests(PySparkStreamingTestCase):
def test_mapValues(self):
"""Basic operation test for DStream.mapValues."""
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
- [("", 4), (1, 1), (2, 2), (3, 3)],
+ [(0, 4), (1, 1), (2, 2), (3, 3)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
def func(dstream):
return dstream.mapValues(lambda x: x + 10)
expected = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
- [("", 14), (1, 11), (2, 12), (3, 13)],
+ [(0, 14), (1, 11), (2, 12), (3, 13)],
[(1, 11), (2, 11), (3, 11), (4, 11)]]
self._test_func(input, func, expected, sort=True)
def test_flatMapValues(self):
"""Basic operation test for DStream.flatMapValues."""
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
- [("", 4), (1, 1), (2, 1), (3, 1)],
+ [(0, 4), (1, 1), (2, 1), (3, 1)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
def func(dstream):
return dstream.flatMapValues(lambda x: (x, x + 10))
expected = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
("c", 1), ("c", 11), ("d", 1), ("d", 11)],
- [("", 4), ("", 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
+ [(0, 4), (0, 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
[(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]]
self._test_func(input, func, expected)
@@ -233,7 +234,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def test_countByValue(self):
"""Basic operation test for DStream.countByValue."""
- input = [range(1, 5) * 2, range(5, 7) + range(5, 9), ["a", "a", "b", ""]]
+ input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]]
def func(dstream):
return dstream.countByValue()
@@ -285,7 +286,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(d1, d2):
return d1.union(d2)
- expected = [range(6), range(6), range(6)]
+ expected = [list(range(6)), list(range(6)), list(range(6))]
self._test_func(input1, func, expected, input2=input2)
def test_cogroup(self):
@@ -424,7 +425,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
duration = 0.1
def _add_input_stream(self):
- inputs = map(lambda x: range(1, x), range(101))
+ inputs = [range(1, x) for x in range(101)]
stream = self.ssc.queueStream(inputs)
self._collect(stream, 1, block=False)
@@ -441,7 +442,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.ssc.stop()
def test_queue_stream(self):
- input = [range(i + 1) for i in range(3)]
+ input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
result = self._collect(dstream, 3)
self.assertEqual(input, result)
@@ -457,13 +458,13 @@ class StreamingContextTests(PySparkStreamingTestCase):
with open(os.path.join(d, name), "w") as f:
f.writelines(["%d\n" % i for i in range(10)])
self.wait_for(result, 2)
- self.assertEqual([range(10), range(10)], result)
+ self.assertEqual([list(range(10)), list(range(10))], result)
def test_binary_records_stream(self):
d = tempfile.mkdtemp()
self.ssc = StreamingContext(self.sc, self.duration)
dstream = self.ssc.binaryRecordsStream(d, 10).map(
- lambda v: struct.unpack("10b", str(v)))
+ lambda v: struct.unpack("10b", bytes(v)))
result = self._collect(dstream, 2, block=False)
self.ssc.start()
for name in ('a', 'b'):
@@ -471,10 +472,10 @@ class StreamingContextTests(PySparkStreamingTestCase):
with open(os.path.join(d, name), "wb") as f:
f.write(bytearray(range(10)))
self.wait_for(result, 2)
- self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+ self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])
def test_union(self):
- input = [range(i + 1) for i in range(3)]
+ input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
dstream2 = self.ssc.queueStream(input)
dstream3 = self.ssc.union(dstream, dstream2)