aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 14:37:21 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 14:37:21 -0800
commit237bac36e9dca8828192994dad323b8da1619267 (patch)
treef01be9fa6590b2e1604e4791dc720ccad28e2fea /streaming/src
parent1346126485444afc065bf4951c4bedebe5c95ce4 (diff)
downloadspark-237bac36e9dca8828192994dad323b8da1619267.tar.gz
spark-237bac36e9dca8828192994dad323b8da1619267.tar.bz2
spark-237bac36e9dca8828192994dad323b8da1619267.zip
Renamed examples and added documentation.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala38
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala16
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
3 files changed, 35 insertions, 21 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 7256e41af9..215246ba2e 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -154,7 +154,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val inputStream = new KafkaInputDStream[T](this, hostname, port, groupId, topics, initialOffsets, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -192,7 +192,7 @@ class StreamingContext private (
storageLevel: StorageLevel
): DStream[T] = {
val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -208,7 +208,7 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
@@ -228,13 +228,14 @@ class StreamingContext private (
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
@@ -244,16 +245,37 @@ class StreamingContext private (
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
- ](directory: String): DStream[(K, V)] = {
+ ] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * @param directory HDFS directory to monitor for new file
+ * @param filter Function to filter paths to process
+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ F <: NewInputFormat[K, V]: ClassManifest
+ ] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
+ val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
+ registerInputStream(inputStream)
inputStream
}
+
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
- * as Text and input format as TextInputFormat).
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = {
@@ -274,7 +296,7 @@ class StreamingContext private (
defaultRDD: RDD[T] = null
): DStream[T] = {
val inputStream = new QueueInputDStream(this, queue, oneAtATime, defaultRDD)
- graph.addInputStream(inputStream)
+ registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
index cf72095324..1e6ad84b44 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FileInputDStream.scala
@@ -14,7 +14,7 @@ private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@transient ssc_ : StreamingContext,
directory: String,
- filter: PathFilter = FileInputDStream.defaultPathFilter,
+ filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
@@ -60,7 +60,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = {
- if (!filter.accept(path)) {
+ if (!filter(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
@@ -95,16 +95,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
}
+private[streaming]
object FileInputDStream {
- val defaultPathFilter = new PathFilter with Serializable {
- def accept(path: Path): Boolean = {
- val file = path.getName()
- if (file.startsWith(".") || file.endsWith("_tmp")) {
- return false
- } else {
- return true
- }
- }
- }
+ def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 76b528bec3..00ee903c1e 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -318,7 +318,7 @@ class TestServer(port: Int) extends Logging {
}
}
} catch {
- case e: SocketException => println(e)
+ case e: SocketException => logInfo(e)
} finally {
logInfo("Connection closed")
if (!clientSocket.isClosed) clientSocket.close()