aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfreeman <the.freeman.lab@gmail.com>2015-02-03 22:24:30 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2015-02-03 22:24:30 -0800
commit242b4f02df7f71ebcfa86a85c9ed39e40750a7fd (patch)
tree446473ec70562e0a0d40a1bab1b53f08ac974a05
parent40c4cb2fe79ceac0d656be7b72cb2ee8d7db7258 (diff)
downloadspark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.tar.gz
spark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.tar.bz2
spark-242b4f02df7f71ebcfa86a85c9ed39e40750a7fd.zip
[SPARK-4969][STREAMING][PYTHON] Add binaryRecords to streaming
In Spark 1.2 we added a `binaryRecords` input method for loading flat binary data. This format is useful for numerical array data, e.g. in scientific computing applications. This PR adds support for the same format in Streaming applications, where it is similarly useful, especially for streaming time series or sensor data. Summary of additions - adding `binaryRecordsStream` to Spark Streaming - exposing `binaryRecordsStream` in the new PySpark Streaming - new unit tests in Scala and Python This required adding an optional Hadoop configuration param to `fileStream` and `FileInputStream`, but was otherwise straightforward. tdas davies Author: freeman <the.freeman.lab@gmail.com> Closes #3803 from freeman-lab/streaming-binary-records and squashes the following commits: b676534 [freeman] Clarify note 5ff1b75 [freeman] Add note to java streaming context eba925c [freeman] Simplify notes c4237b8 [freeman] Add experimental tag 30eba67 [freeman] Add filter and newFilesOnly alongside conf c2cfa6d [freeman] Expose new version of fileStream with conf in java 34d20ef [freeman] Add experimental tag 14bca9a [freeman] Add experimental tag b85bffc [freeman] Formatting 47560f4 [freeman] Space formatting 9a3715a [freeman] Refactor to reflect changes to FileInputSuite 7373f73 [freeman] Add note and defensive assertion for byte length 3ceb684 [freeman] Merge remote-tracking branch 'upstream/master' into streaming-binary-records 317b6d1 [freeman] Make test inline fcb915c [freeman] Formatting becb344 [freeman] Formatting d3e75b2 [freeman] Add tests in python a4324a3 [freeman] Line length 029d49c [freeman] Formatting 1c739aa [freeman] Simpler default arg handling 94d90d0 [freeman] Spelling 2843e9d [freeman] Add params to docstring 8b70fbc [freeman] Reorganization 28bff9b [freeman] Fix missing arg 9398bcb [freeman] Expose optional hadoop configuration 23dd69f [freeman] Tests for binaryRecordsStream 36cb0fd [freeman] Add binaryRecordsStream to scala fe4e803 [freeman] Add binaryRecordStream to Java API ecef0eb [freeman] Add binaryRecordsStream to python 8550c26 [freeman] Expose additional argument combination
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--python/pyspark/streaming/context.py16
-rw-r--r--python/pyspark/streaming/tests.py15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala52
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala17
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala51
7 files changed, 212 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 16c6fdbe52..eecfdd4222 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
+ * '''Note:''' We ensure that the byte array for each record in the resulting RDD
+ * has the provided record length.
+ *
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
- val data = br.map{ case (k, v) => v.getBytes}
+ val data = br.map { case (k, v) =>
+ val bytes = v.getBytes
+ assert(bytes.length == recordLength, "Byte array does not have correct length")
+ bytes
+ }
data
}
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index d48f3598e3..18aaae93b0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+ def binaryRecordsStream(self, directory, recordLength):
+ """
+ Create an input stream that monitors a Hadoop-compatible file system
+ for new files and reads them as flat binary files with records of
+ fixed length. Files must be written to the monitored directory by "moving"
+ them from another location within the same file system.
+ File names starting with . are ignored.
+
+ @param directory: Directory to load data from
+ @param recordLength: Length of each record in bytes
+ """
+ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
+ NoOpSerializer())
+
def _check_serializers(self, rdds):
# make sure they have same serializer
if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a8d876d0fa..608f8e2647 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
import operator
import unittest
import tempfile
+import struct
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], result)
+ def test_binary_records_stream(self):
+ d = tempfile.mkdtemp()
+ self.ssc = StreamingContext(self.sc, self.duration)
+ dstream = self.ssc.binaryRecordsStream(d, 10).map(
+ lambda v: struct.unpack("10b", str(v)))
+ result = self._collect(dstream, 2, block=False)
+ self.ssc.start()
+ for name in ('a', 'b'):
+ time.sleep(1)
+ with open(os.path.join(d, name), "wb") as f:
+ f.write(bytearray(range(10)))
+ self.wait_for(result, 2)
+ self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+
def test_union(self):
input = [range(i + 1) for i in range(3)]
dstream = self.ssc.queueStream(input)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8ef0787137..ddc435cf1a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -27,10 +27,12 @@ import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
@@ -361,6 +363,30 @@ class StreamingContext private[streaming] (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @param conf Hadoop configuration
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
+ ] (directory: String,
+ filter: Path => Boolean,
+ newFilesOnly: Boolean,
+ conf: Configuration): InputDStream[(K, V)] = {
+ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
+ }
+
+ /**
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
@@ -372,6 +398,37 @@ class StreamingContext private[streaming] (
}
/**
+ * :: Experimental ::
+ *
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as flat binary files, assuming a fixed length per record,
+ * generating one byte array per record. Files must be written to the monitored directory
+ * by "moving" them from another location within the same file system. File names
+ * starting with . are ignored.
+ *
+ * '''Note:''' We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
+ *
+ * @param directory HDFS directory to monitor for new file
+ * @param recordLength length of each record in bytes
+ */
+ @Experimental
+ def binaryRecordsStream(
+ directory: String,
+ recordLength: Int): DStream[Array[Byte]] = {
+ val conf = sc_.hadoopConfiguration
+ conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
+ val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
+ directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
+ val data = br.map { case (k, v) =>
+ val bytes = v.getBytes
+ assert(bytes.length == recordLength, "Byte array does not have correct length")
+ bytes
+ }
+ data
+ }
+
+ /**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 9a2254bcdc..0f7ae7a1c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
@@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create an input stream from network source hostname:port. Data is received using
- * a TCP socket and the receive bytes it interepreted as object using the given
+ * a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
@@ -210,6 +211,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
+ * :: Experimental ::
+ *
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as flat binary files with fixed record lengths,
+ * yielding byte arrays
+ *
+ * '''Note:''' We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
+ *
+ * @param directory HDFS directory to monitor for new files
+ * @param recordLength The length at which to split the records
+ */
+ @Experimental
+ def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = {
+ ssc.binaryRecordsStream(directory, recordLength)
+ }
+
+ /**
* Create an input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
@@ -299,6 +318,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @param kClass class of key for reading HDFS file
+ * @param vClass class of value for reading HDFS file
+ * @param fClass class of input format for reading HDFS file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @param conf Hadoop configuration
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](
+ directory: String,
+ kClass: Class[K],
+ vClass: Class[V],
+ fClass: Class[F],
+ filter: JFunction[Path, JBoolean],
+ newFilesOnly: Boolean,
+ conf: Configuration): JavaPairInputDStream[K, V] = {
+ implicit val cmk: ClassTag[K] = ClassTag(kClass)
+ implicit val cmv: ClassTag[V] = ClassTag(vClass)
+ implicit val cmf: ClassTag[F] = ClassTag(fClass)
+ def fn = (x: Path) => filter.call(x).booleanValue()
+ ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e7c5639a63..6379b88527 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* processing semantics are undefined.
*/
private[streaming]
-class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
+class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
- newFilesOnly: Boolean = true)
+ newFilesOnly: Boolean = true,
+ conf: Option[Configuration] = None)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {
// This is a def so that it works during checkpoint recovery:
@@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file =>{
- val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
+ val rdd = conf match {
+ case Some(config) => context.sparkContext.newAPIHadoopFile(
+ file,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ config)
+ case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
+ }
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index bddf51e130..01084a457d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
+ test("binary records stream") {
+ val testDir: File = null
+ try {
+ val batchDuration = Seconds(2)
+ val testDir = Utils.createTempDir()
+ // Create a file that exists before the StreamingContext is created:
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
+ val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
+ with SynchronizedBuffer[Seq[Array[Byte]]]
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.addToTime(batchDuration.milliseconds / 2)
+
+ val input = Seq(1, 2, 3, 4, 5)
+ input.foreach { i =>
+ Thread.sleep(batchDuration.milliseconds)
+ val file = new File(testDir, i.toString)
+ Files.write(Array[Byte](i.toByte), file)
+ assert(file.setLastModified(clock.currentTime()))
+ assert(file.lastModified === clock.currentTime)
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
+
+ val expectedOutput = input.map(i => i.toByte)
+ val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
+ assert(obtainedOutput === expectedOutput)
+ }
+ } finally {
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
test("file input stream - newFilesOnly = true") {
testFileStream(newFilesOnly = true)