From 242b4f02df7f71ebcfa86a85c9ed39e40750a7fd Mon Sep 17 00:00:00 2001 From: freeman Date: Tue, 3 Feb 2015 22:24:30 -0800 Subject: [SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data. Summary of additions - adding `binaryRecordsStream` to Spark Streaming - exposing `binaryRecordsStream` in the new PySpark Streaming - new unit tests in Scala and Python This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward. tdas davies Author: freeman Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits: b676534 [freeman] Clarify note 5ff1b75 [freeman] Add note to java streaming context eba925c [freeman] Simplify notes c4237b8 [freeman] Add experimental tag 30eba67 [freeman] Add filter and newFilesOnly alongside conf c2cfa6d [freeman] Expose new version of fileStream with conf in java 34d20ef [freeman] Add experimental tag 14bca9a [freeman] Add experimental tag b85bffc [freeman] Formatting 47560f4 [freeman] Space formatting 9a3715a [freeman] Refactor to reflect changes to FileInputSuite 7373f73 [freeman] Add note and defensive assertion for byte length 3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records 317b6d1 [freeman] Make test inline fcb915c [freeman] Formatting becb344 [freeman] Formatting d3e75b2 [freeman] Add tests in python a4324a3 [freeman] Line length 029d49c [freeman] Formatting 1c739aa [freeman] Simpler default arg handling 94d90d0 [freeman] Spelling 2843e9d [freeman] Add params to docstring 8b70fbc [freeman] Reorganization 28bff9b [freeman] Fix missing arg 9398bcb [freeman] Expose optional hadoop configuration 23dd69f [freeman] Tests for binaryRecordsStream 36cb0fd [freeman] Add binaryRecordsStream to scala fe4e803 [freeman] Add binaryRecordStream to Java API ecef0eb [freeman] Add binaryRecordsStream to python 8550c26 [freeman] Expose additional argument combination --- python/pyspark/streaming/context.py | 16 +++++++++++++++- python/pyspark/streaming/tests.py | 15 +++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) (limited to 'python/pyspark') diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index d48f3598e3..18aaae93b0 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter from py4j.java_gateway import java_import, JavaObject from pyspark import RDD, SparkConf -from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer +from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream @@ -251,6 +251,20 @@ class StreamingContext(object): """ return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer()) + def binaryRecordsStream(self, directory, recordLength): + """ + Create an input stream that monitors a Hadoop-compatible file system + for new files and reads them as flat binary files with records of + fixed length. Files must be written to the monitored directory by "moving" + them from another location within the same file system. + File names starting with . are ignored. + + @param directory: Directory to load data from + @param recordLength: Length of each record in bytes + """ + return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, + NoOpSerializer()) + def _check_serializers(self, rdds): # make sure they have same serializer if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1: diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index a8d876d0fa..608f8e2647 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -21,6 +21,7 @@ import time import operator import unittest import tempfile +import struct from pyspark.context import SparkConf, SparkContext, RDD from pyspark.streaming.context import StreamingContext @@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase): self.wait_for(result, 2) self.assertEqual([range(10), range(10)], result) + def test_binary_records_stream(self): + d = tempfile.mkdtemp() + self.ssc = StreamingContext(self.sc, self.duration) + dstream = self.ssc.binaryRecordsStream(d, 10).map( + lambda v: struct.unpack("10b", str(v))) + result = self._collect(dstream, 2, block=False) + self.ssc.start() + for name in ('a', 'b'): + time.sleep(1) + with open(os.path.join(d, name), "wb") as f: + f.write(bytearray(range(10))) + self.wait_for(result, 2) + self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result)) + def test_union(self): input = [range(i + 1) for i in range(3)] dstream = self.ssc.queueStream(input) -- cgit v1.2.3