aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py15
1 files changed, 15 insertions, 0 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a8d876d0fa..608f8e2647 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
import operator
import unittest
import tempfile
+import struct
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], result)
+ def test_binary_records_stream(self):
+ d = tempfile.mkdtemp()
+ self.ssc = StreamingContext(self.sc, self.duration)
+ dstream = self.ssc.binaryRecordsStream(d, 10).map(
+ lambda v: struct.unpack("10b", str(v)))
+ result = self._collect(dstream, 2, block=False)
+ self.ssc.start()
+ for name in ('a', 'b'):
+ time.sleep(1)
+ with open(os.path.join(d, name), "wb") as f:
+ f.write(bytearray(range(10)))
+ self.wait_for(result, 2)
+ self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+
def test_union(self):
input = [range(i + 1) for i in range(3)]
dstream = self.ssc.queueStream(input)