aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/streaming/context.py16
-rw-r--r--python/pyspark/streaming/tests.py15
2 files changed, 30 insertions, 1 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index d48f3598e3..18aaae93b0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+ def binaryRecordsStream(self, directory, recordLength):
+ """
+ Create an input stream that monitors a Hadoop-compatible file system
+ for new files and reads them as flat binary files with records of
+ fixed length. Files must be written to the monitored directory by "moving"
+ them from another location within the same file system.
+ File names starting with . are ignored.
+
+ @param directory: Directory to load data from
+ @param recordLength: Length of each record in bytes
+ """
+ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
+ NoOpSerializer())
+
def _check_serializers(self, rdds):
# make sure they have same serializer
if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a8d876d0fa..608f8e2647 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
import operator
import unittest
import tempfile
+import struct
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], result)
+ def test_binary_records_stream(self):
+ d = tempfile.mkdtemp()
+ self.ssc = StreamingContext(self.sc, self.duration)
+ dstream = self.ssc.binaryRecordsStream(d, 10).map(
+ lambda v: struct.unpack("10b", str(v)))
+ result = self._collect(dstream, 2, block=False)
+ self.ssc.start()
+ for name in ('a', 'b'):
+ time.sleep(1)
+ with open(os.path.join(d, name), "wb") as f:
+ f.write(bytearray(range(10)))
+ self.wait_for(result, 2)
+ self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+
def test_union(self):
input = [range(i + 1) for i in range(3)]
dstream = self.ssc.queueStream(input)