aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/context.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/context.py')
-rw-r--r--python/pyspark/streaming/context.py16
1 files changed, 15 insertions, 1 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index d48f3598e3..18aaae93b0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+ def binaryRecordsStream(self, directory, recordLength):
+ """
+ Create an input stream that monitors a Hadoop-compatible file system
+ for new files and reads them as flat binary files with records of
+ fixed length. Files must be written to the monitored directory by "moving"
+ them from another location within the same file system.
+ File names starting with . are ignored.
+
+ @param directory: Directory to load data from
+ @param recordLength: Length of each record in bytes
+ """
+ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
+ NoOpSerializer())
+
def _check_serializers(self, rdds):
# make sure they have same serializer
if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1: