aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py39
1 files changed, 20 insertions, 19 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 9b4635e490..06d2215437 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -22,6 +22,7 @@ import operator
import unittest
import tempfile
import struct
+from functools import reduce
from py4j.java_collections import MapConverter
@@ -51,7 +52,7 @@ class PySparkStreamingTestCase(unittest.TestCase):
while len(result) < n and time.time() - start_time < self.timeout:
time.sleep(0.01)
if len(result) < n:
- print "timeout after", self.timeout
+ print("timeout after", self.timeout)
def _take(self, dstream, n):
"""
@@ -131,7 +132,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.map(str)
- expected = map(lambda x: map(str, x), input)
+ expected = [list(map(str, x)) for x in input]
self._test_func(input, func, expected)
def test_flatMap(self):
@@ -140,8 +141,8 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.flatMap(lambda x: (x, x * 2))
- expected = map(lambda x: list(chain.from_iterable((map(lambda y: [y, y * 2], x)))),
- input)
+ expected = [list(chain.from_iterable((map(lambda y: [y, y * 2], x))))
+ for x in input]
self._test_func(input, func, expected)
def test_filter(self):
@@ -150,7 +151,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.filter(lambda x: x % 2 == 0)
- expected = map(lambda x: filter(lambda y: y % 2 == 0, x), input)
+ expected = [[y for y in x if y % 2 == 0] for x in input]
self._test_func(input, func, expected)
def test_count(self):
@@ -159,7 +160,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.count()
- expected = map(lambda x: [len(x)], input)
+ expected = [[len(x)] for x in input]
self._test_func(input, func, expected)
def test_reduce(self):
@@ -168,7 +169,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(dstream):
return dstream.reduce(operator.add)
- expected = map(lambda x: [reduce(operator.add, x)], input)
+ expected = [[reduce(operator.add, x)] for x in input]
self._test_func(input, func, expected)
def test_reduceByKey(self):
@@ -185,27 +186,27 @@ class BasicOperationTests(PySparkStreamingTestCase):
def test_mapValues(self):
"""Basic operation test for DStream.mapValues."""
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
- [("", 4), (1, 1), (2, 2), (3, 3)],
+ [(0, 4), (1, 1), (2, 2), (3, 3)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
def func(dstream):
return dstream.mapValues(lambda x: x + 10)
expected = [[("a", 12), ("b", 12), ("c", 11), ("d", 11)],
- [("", 14), (1, 11), (2, 12), (3, 13)],
+ [(0, 14), (1, 11), (2, 12), (3, 13)],
[(1, 11), (2, 11), (3, 11), (4, 11)]]
self._test_func(input, func, expected, sort=True)
def test_flatMapValues(self):
"""Basic operation test for DStream.flatMapValues."""
input = [[("a", 2), ("b", 2), ("c", 1), ("d", 1)],
- [("", 4), (1, 1), (2, 1), (3, 1)],
+ [(0, 4), (1, 1), (2, 1), (3, 1)],
[(1, 1), (2, 1), (3, 1), (4, 1)]]
def func(dstream):
return dstream.flatMapValues(lambda x: (x, x + 10))
expected = [[("a", 2), ("a", 12), ("b", 2), ("b", 12),
("c", 1), ("c", 11), ("d", 1), ("d", 11)],
- [("", 4), ("", 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
+ [(0, 4), (0, 14), (1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11)],
[(1, 1), (1, 11), (2, 1), (2, 11), (3, 1), (3, 11), (4, 1), (4, 11)]]
self._test_func(input, func, expected)
@@ -233,7 +234,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def test_countByValue(self):
"""Basic operation test for DStream.countByValue."""
- input = [range(1, 5) * 2, range(5, 7) + range(5, 9), ["a", "a", "b", ""]]
+ input = [list(range(1, 5)) * 2, list(range(5, 7)) + list(range(5, 9)), ["a", "a", "b", ""]]
def func(dstream):
return dstream.countByValue()
@@ -285,7 +286,7 @@ class BasicOperationTests(PySparkStreamingTestCase):
def func(d1, d2):
return d1.union(d2)
- expected = [range(6), range(6), range(6)]
+ expected = [list(range(6)), list(range(6)), list(range(6))]
self._test_func(input1, func, expected, input2=input2)
def test_cogroup(self):
@@ -424,7 +425,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
duration = 0.1
def _add_input_stream(self):
- inputs = map(lambda x: range(1, x), range(101))
+ inputs = [range(1, x) for x in range(101)]
stream = self.ssc.queueStream(inputs)
self._collect(stream, 1, block=False)
@@ -441,7 +442,7 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.ssc.stop()
def test_queue_stream(self):
- input = [range(i + 1) for i in range(3)]
+ input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
result = self._collect(dstream, 3)
self.assertEqual(input, result)
@@ -457,13 +458,13 @@ class StreamingContextTests(PySparkStreamingTestCase):
with open(os.path.join(d, name), "w") as f:
f.writelines(["%d\n" % i for i in range(10)])
self.wait_for(result, 2)
- self.assertEqual([range(10), range(10)], result)
+ self.assertEqual([list(range(10)), list(range(10))], result)
def test_binary_records_stream(self):
d = tempfile.mkdtemp()
self.ssc = StreamingContext(self.sc, self.duration)
dstream = self.ssc.binaryRecordsStream(d, 10).map(
- lambda v: struct.unpack("10b", str(v)))
+ lambda v: struct.unpack("10b", bytes(v)))
result = self._collect(dstream, 2, block=False)
self.ssc.start()
for name in ('a', 'b'):
@@ -471,10 +472,10 @@ class StreamingContextTests(PySparkStreamingTestCase):
with open(os.path.join(d, name), "wb") as f:
f.write(bytearray(range(10)))
self.wait_for(result, 2)
- self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+ self.assertEqual([list(range(10)), list(range(10))], [list(v[0]) for v in result])
def test_union(self):
- input = [range(i + 1) for i in range(3)]
+ input = [list(range(i + 1)) for i in range(3)]
dstream = self.ssc.queueStream(input)
dstream2 = self.ssc.queueStream(input)
dstream3 = self.ssc.union(dstream, dstream2)