aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--python/pyspark/streaming/context.py16
-rw-r--r--python/pyspark/streaming/tests.py15
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala59
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala52
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala17
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala51
7 files changed, 212 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 16c6fdbe52..eecfdd4222 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -657,6 +657,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* Load data from a flat binary file, assuming the length of each record is constant.
*
+ * '''Note:''' We ensure that the byte array for each record in the resulting RDD
+ * has the provided record length.
+ *
* @param path Directory to the input data files
* @param recordLength The length at which to split the records
* @return An RDD of data with values, represented as byte arrays
@@ -671,7 +674,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
classOf[LongWritable],
classOf[BytesWritable],
conf=conf)
- val data = br.map{ case (k, v) => v.getBytes}
+ val data = br.map { case (k, v) =>
+ val bytes = v.getBytes
+ assert(bytes.length == recordLength, "Byte array does not have correct length")
+ bytes
+ }
data
}
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index d48f3598e3..18aaae93b0 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -21,7 +21,7 @@ from py4j.java_collections import ListConverter
from py4j.java_gateway import java_import, JavaObject
from pyspark import RDD, SparkConf
-from pyspark.serializers import UTF8Deserializer, CloudPickleSerializer
+from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer
from pyspark.context import SparkContext
from pyspark.storagelevel import StorageLevel
from pyspark.streaming.dstream import DStream
@@ -251,6 +251,20 @@ class StreamingContext(object):
"""
return DStream(self._jssc.textFileStream(directory), self, UTF8Deserializer())
+ def binaryRecordsStream(self, directory, recordLength):
+ """
+ Create an input stream that monitors a Hadoop-compatible file system
+ for new files and reads them as flat binary files with records of
+ fixed length. Files must be written to the monitored directory by "moving"
+ them from another location within the same file system.
+ File names starting with . are ignored.
+
+ @param directory: Directory to load data from
+ @param recordLength: Length of each record in bytes
+ """
+ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self,
+ NoOpSerializer())
+
def _check_serializers(self, rdds):
# make sure they have same serializer
if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index a8d876d0fa..608f8e2647 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -21,6 +21,7 @@ import time
import operator
import unittest
import tempfile
+import struct
from pyspark.context import SparkConf, SparkContext, RDD
from pyspark.streaming.context import StreamingContext
@@ -455,6 +456,20 @@ class StreamingContextTests(PySparkStreamingTestCase):
self.wait_for(result, 2)
self.assertEqual([range(10), range(10)], result)
+ def test_binary_records_stream(self):
+ d = tempfile.mkdtemp()
+ self.ssc = StreamingContext(self.sc, self.duration)
+ dstream = self.ssc.binaryRecordsStream(d, 10).map(
+ lambda v: struct.unpack("10b", str(v)))
+ result = self._collect(dstream, 2, block=False)
+ self.ssc.start()
+ for name in ('a', 'b'):
+ time.sleep(1)
+ with open(os.path.join(d, name), "wb") as f:
+ f.write(bytearray(range(10)))
+ self.wait_for(result, 2)
+ self.assertEqual([range(10), range(10)], map(lambda v: list(v[0]), result))
+
def test_union(self):
input = [range(i + 1) for i in range(3)]
dstream = self.ssc.queueStream(input)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 8ef0787137..ddc435cf1a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -27,10 +27,12 @@ import scala.reflect.ClassTag
import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.input.FixedLengthBinaryInputFormat
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
@@ -361,6 +363,30 @@ class StreamingContext private[streaming] (
/**
* Create a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @param conf Hadoop configuration
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassTag,
+ V: ClassTag,
+ F <: NewInputFormat[K, V]: ClassTag
+ ] (directory: String,
+ filter: Path => Boolean,
+ newFilesOnly: Boolean,
+ conf: Configuration): InputDStream[(K, V)] = {
+ new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly, Option(conf))
+ }
+
+ /**
+ * Create a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
@@ -372,6 +398,37 @@ class StreamingContext private[streaming] (
}
/**
+ * :: Experimental ::
+ *
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as flat binary files, assuming a fixed length per record,
+ * generating one byte array per record. Files must be written to the monitored directory
+ * by "moving" them from another location within the same file system. File names
+ * starting with . are ignored.
+ *
+ * '''Note:''' We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
+ *
+ * @param directory HDFS directory to monitor for new file
+ * @param recordLength length of each record in bytes
+ */
+ @Experimental
+ def binaryRecordsStream(
+ directory: String,
+ recordLength: Int): DStream[Array[Byte]] = {
+ val conf = sc_.hadoopConfiguration
+ conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
+ val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
+ directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
+ val data = br.map { case (k, v) =>
+ val bytes = v.getBytes
+ assert(bytes.length == recordLength, "Byte array does not have correct length")
+ bytes
+ }
+ data
+ }
+
+ /**
* Create an input stream from a queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 9a2254bcdc..0f7ae7a1c7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.spark.rdd.RDD
@@ -177,7 +178,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
/**
* Create an input stream from network source hostname:port. Data is received using
- * a TCP socket and the receive bytes it interepreted as object using the given
+ * a TCP socket and the receive bytes it interpreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
@@ -210,6 +211,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
+ * :: Experimental ::
+ *
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as flat binary files with fixed record lengths,
+ * yielding byte arrays
+ *
+ * '''Note:''' We ensure that the byte array for each record in the
+ * resulting RDDs of the DStream has the provided record length.
+ *
+ * @param directory HDFS directory to monitor for new files
+ * @param recordLength The length at which to split the records
+ */
+ @Experimental
+ def binaryRecordsStream(directory: String, recordLength: Int): JavaDStream[Array[Byte]] = {
+ ssc.binaryRecordsStream(directory, recordLength)
+ }
+
+ /**
* Create an input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
@@ -299,6 +318,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
+ * Create an input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * Files must be written to the monitored directory by "moving" them from another
+ * location within the same file system. File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @param kClass class of key for reading HDFS file
+ * @param vClass class of value for reading HDFS file
+ * @param fClass class of input format for reading HDFS file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @param conf Hadoop configuration
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](
+ directory: String,
+ kClass: Class[K],
+ vClass: Class[V],
+ fClass: Class[F],
+ filter: JFunction[Path, JBoolean],
+ newFilesOnly: Boolean,
+ conf: Configuration): JavaPairInputDStream[K, V] = {
+ implicit val cmk: ClassTag[K] = ClassTag(kClass)
+ implicit val cmv: ClassTag[V] = ClassTag(vClass)
+ implicit val cmf: ClassTag[F] = ClassTag(fClass)
+ def fn = (x: Path) => filter.call(x).booleanValue()
+ ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf)
+ }
+
+ /**
* Create an input stream with any arbitrary user implemented actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e7c5639a63..6379b88527 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import scala.reflect.ClassTag
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -68,11 +69,13 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* processing semantics are undefined.
*/
private[streaming]
-class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
+class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
- newFilesOnly: Boolean = true)
+ newFilesOnly: Boolean = true,
+ conf: Option[Configuration] = None)
+ (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
extends InputDStream[(K, V)](ssc_) {
// This is a def so that it works during checkpoint recovery:
@@ -237,7 +240,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file =>{
- val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
+ val rdd = conf match {
+ case Some(config) => context.sparkContext.newAPIHadoopFile(
+ file,
+ fm.runtimeClass.asInstanceOf[Class[F]],
+ km.runtimeClass.asInstanceOf[Class[K]],
+ vm.runtimeClass.asInstanceOf[Class[V]],
+ config)
+ case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
+ }
if (rdd.partitions.size == 0) {
logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
"files that have been \"moved\" to the directory assigned to the file stream. " +
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index bddf51e130..01084a457d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -95,6 +95,57 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
+ test("binary records stream") {
+ val testDir: File = null
+ try {
+ val batchDuration = Seconds(2)
+ val testDir = Utils.createTempDir()
+ // Create a file that exists before the StreamingContext is created:
+ val existingFile = new File(testDir, "0")
+ Files.write("0\n", existingFile, Charset.forName("UTF-8"))
+ assert(existingFile.setLastModified(10000) && existingFile.lastModified === 10000)
+
+ // Set up the streaming context and input streams
+ withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ // This `setTime` call ensures that the clock is past the creation time of `existingFile`
+ clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
+ val batchCounter = new BatchCounter(ssc)
+ val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
+ val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
+ with SynchronizedBuffer[Seq[Array[Byte]]]
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ outputStream.register()
+ ssc.start()
+
+ // Advance the clock so that the files are created after StreamingContext starts, but
+ // not enough to trigger a batch
+ clock.addToTime(batchDuration.milliseconds / 2)
+
+ val input = Seq(1, 2, 3, 4, 5)
+ input.foreach { i =>
+ Thread.sleep(batchDuration.milliseconds)
+ val file = new File(testDir, i.toString)
+ Files.write(Array[Byte](i.toByte), file)
+ assert(file.setLastModified(clock.currentTime()))
+ assert(file.lastModified === clock.currentTime)
+ logInfo("Created file " + file)
+ // Advance the clock after creating the file to avoid a race when
+ // setting its modification time
+ clock.addToTime(batchDuration.milliseconds)
+ eventually(eventuallyTimeout) {
+ assert(batchCounter.getNumCompletedBatches === i)
+ }
+ }
+
+ val expectedOutput = input.map(i => i.toByte)
+ val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
+ assert(obtainedOutput === expectedOutput)
+ }
+ } finally {
+ if (testDir != null) Utils.deleteRecursively(testDir)
+ }
+ }
test("file input stream - newFilesOnly = true") {
testFileStream(newFilesOnly = true)