diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 14:37:21 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 14:37:21 -0800 |
commit | 237bac36e9dca8828192994dad323b8da1619267 (patch) | |
tree | f01be9fa6590b2e1604e4791dc720ccad28e2fea /streaming | |
parent | 1346126485444afc065bf4951c4bedebe5c95ce4 (diff) | |
download | spark-237bac36e9dca8828192994dad323b8da1619267.tar.gz spark-237bac36e9dca8828192994dad323b8da1619267.tar.bz2 spark-237bac36e9dca8828192994dad323b8da1619267.zip |
Renamed examples and added documentation.
Diffstat (limited to 'streaming')
3 files changed, 35 insertions, 21 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 7256e41af9..215246ba2e 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -154,7 +154,7 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -192,7 +192,7 @@ class StreamingContext private ( storageLevel: StorageLevel ): DStream[T] = { val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -208,7 +208,7 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[SparkFlumeEvent] = { val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } @@ -228,13 +228,14 @@ class StreamingContext private ( storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } /** * Creates a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file @@ -244,16 +245,37 @@ class StreamingContext private ( K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K, V]: ClassManifest - ](directory: String): DStream[(K, V)] = { + ] (directory: String): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory) - graph.addInputStream(inputStream) + registerInputStream(inputStream) + inputStream + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * @param directory HDFS directory to monitor for new file + * @param filter Function to filter paths to process + * @param newFilesOnly Should process only new files and ignore existing files in the directory + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[ + K: ClassManifest, + V: ClassManifest, + F <: NewInputFormat[K, V]: ClassManifest + ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = { + val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly) + registerInputStream(inputStream) inputStream } + /** * Creates a input stream that monitors a Hadoop-compatible filesystem * for new files and reads them as text files (using key as LongWritable, value - * as Text and input format as TextInputFormat). + * as Text and input format as TextInputFormat). File names starting with . are ignored. * @param directory HDFS directory to monitor for new file */ def textFileStream(directory: String): DStream[String] = { @@ -274,7 +296,7 @@ class StreamingContext private ( defaultRDD: RDD[T] = null ): DStream[T] = { val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD) - graph.addInputStream(inputStream) + registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala index cf72095324..1e6ad84b44 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala @@ -14,7 +14,7 @@ private[streaming] class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @transient ssc_ : StreamingContext, directory: String, - filter: PathFilter = FileInputDStream.defaultPathFilter, + filter: Path => Boolean = FileInputDStream.defaultFilter, newFilesOnly: Boolean = true) extends InputDStream[(K, V)](ssc_) { @@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K val latestModTimeFiles = new HashSet[String]() def accept(path: Path): Boolean = { - if (!filter.accept(path)) { + if (!filter(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() @@ -95,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } } +private[streaming] object FileInputDStream { - val defaultPathFilter = new PathFilter with Serializable { - def accept(path: Path): Boolean = { - val file = path.getName() - if (file.startsWith(".") || file.endsWith("_tmp")) { - return false - } else { - return true - } - } - } + def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 76b528bec3..00ee903c1e 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -318,7 +318,7 @@ class TestServer(port: Int) extends Logging { } } } catch { - case e: SocketException => println(e) + case e: SocketException => logInfo(e) } finally { logInfo("Connection closed") if (!clientSocket.isClosed) clientSocket.close() |