aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-23 15:17:05 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-23 15:17:05 -0700
commit0e5d9be4dfe0d072db8410fe6d254555bba9367d (patch)
tree266493e7632c3ea1ca215ae218642555e0b23792 /streaming/src/main
parentc2731dd3effe780d7f37487f8cbd27179055ebee (diff)
downloadspark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.tar.gz
spark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.tar.bz2
spark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.zip
Renamed APIs to create queueStream and fileStream.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/QueueStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala2
5 files changed, 10 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 90654cdad9..228f1a3616 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -131,7 +131,7 @@ class StreamingContext (
* This function creates a input stream that monitors a Hadoop-compatible
* for new files and executes the necessary processing on them.
*/
- def createFileStream[
+ def fileStream[
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
@@ -141,15 +141,15 @@ class StreamingContext (
inputStream
}
- def createTextFileStream(directory: String): DStream[String] = {
- createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
+ def textFileStream(directory: String): DStream[String] = {
+ fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
/**
* This function create a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue
*/
- def createQueueStream[T: ClassManifest](
+ def queueStream[T: ClassManifest](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
@@ -159,9 +159,9 @@ class StreamingContext (
inputStream
}
- def createQueueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
+ def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
val queue = new Queue[RDD[T]]
- val inputStream = createQueueStream(queue, true, null)
+ val inputStream = queueStream(queue, true, null)
queue ++= array
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
index 301da56014..d68611abd6 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala
@@ -27,7 +27,7 @@ object FileStream {
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
- val inputStream = ssc.createTextFileStream(directory.toString)
+ val inputStream = ssc.textFileStream(directory.toString)
val words = inputStream.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
index c725035a8a..df96a811da 100644
--- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala
@@ -37,7 +37,7 @@ object FileStreamWithCheckpoint {
ssc_.setCheckpointDetails(checkpointFile, Seconds(1))
// Setup the streaming computation
- val inputStream = ssc_.createTextFileStream(directory.toString)
+ val inputStream = ssc_.textFileStream(directory.toString)
val words = inputStream.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
index ae701bba6d..2af51bad28 100644
--- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -24,7 +24,7 @@ object QueueStream {
val rddQueue = new SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
- val inputStream = ssc.createQueueStream(rddQueue)
+ val inputStream = ssc.queueStream(rddQueue)
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
reducedStream.print()
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
index 3b86948822..591cb141c3 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala
@@ -16,7 +16,7 @@ object WordCountHdfs {
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
- val lines = ssc.createTextFileStream(args(1))
+ val lines = ssc.textFileStream(args(1))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()