aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorfreeman <the.freeman.lab@gmail.com>2015-02-03 22:24:30 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-03 22:24:30 -0800
commit242b4f02df7f71ebcfa86a85c9ed39e40750a7fd (patch)
tree446473ec70562e0a0d40a1bab1b53f08ac974a05 /core
parent40c4cb2fe79ceac0d656be7b72cb2ee8d7db7258 (diff)
downloadspark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.tar.gz
spark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.tar.bz2
spark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.zip
[SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming
In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data. Summary of additions - adding `binaryRecordsStream` to Spark Streaming - exposing `binaryRecordsStream` in the new PySpark Streaming - new unit tests in Scala and Python This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward. tdas davies Author: freeman <the.freeman.lab@gmail.com> Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits: b676534 [freeman] Clarify note 5ff1b75 [freeman] Add note to java streaming context eba925c [freeman] Simplify notes c4237b8 [freeman] Add experimental tag 30eba67 [freeman] Add filter and newFilesOnly alongside conf c2cfa6d [freeman] Expose new version of fileStream with conf in java 34d20ef [freeman] Add experimental tag 14bca9a [freeman] Add experimental tag b85bffc [freeman] Formatting 47560f4 [freeman] Space formatting 9a3715a [freeman] Refactor to reflect changes to FileInputSuite 7373f73 [freeman] Add note and defensive assertion for byte length 3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records 317b6d1 [freeman] Make test inline fcb915c [freeman] Formatting becb344 [freeman] Formatting d3e75b2 [freeman] Add tests in python a4324a3 [freeman] Line length 029d49c [freeman] Formatting 1c739aa [freeman] Simpler default arg handling 94d90d0 [freeman] Spelling 2843e9d [freeman] Add params to docstring 8b70fbc [freeman] Reorganization 28bff9b [freeman] Fix missing arg 9398bcb [freeman] Expose optional hadoop configuration 23dd69f [freeman] Tests for binaryRecordsStream 36cb0fd [freeman] Add binaryRecordsStream to scala fe4e803 [freeman] Add binaryRecordStream to Java API ecef0eb [freeman] Add binaryRecordsStream to python 8550c26 [freeman] Expose additional argument combination
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
1 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 16c6fdbe52..eecfdd4222 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
+ * '''Note:''' We ensure that the byte array for each record in the resulting RDD
+ * has the provided record length.
+ *
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
- val data = br.map{ case (k, v) => v.getBytes}
+ val data = br.map { case (k, v) =>
+ val bytes = v.getBytes
+ assert(bytes.length == recordLength, "Byte array does not have correct length")
+ bytes
+ }
data
}