diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-23 15:17:05 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-23 15:17:05 -0700 |
commit | 0e5d9be4dfe0d072db8410fe6d254555bba9367d (patch) | |
tree | 266493e7632c3ea1ca215ae218642555e0b23792 /streaming/src/main | |
parent | c2731dd3effe780d7f37487f8cbd27179055ebee (diff) | |
download | spark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.tar.gz spark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.tar.bz2 spark-0e5d9be4dfe0d072db8410fe6d254555bba9367d.zip |
Renamed APIs to create queueStream and fileStream.
Diffstat (limited to 'streaming/src/main')
5 files changed, 10 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 90654cdad9..228f1a3616 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -131,7 +131,7 @@ class StreamingContext ( * This function creates a input stream that monitors a Hadoop-compatible * for new files and executes the necessary processing on them. */ - def createFileStream[ + def fileStream[ K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K, V]: ClassManifest @@ -141,15 +141,15 @@ class StreamingContext ( inputStream } - def createTextFileStream(directory: String): DStream[String] = { - createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) + def textFileStream(directory: String): DStream[String] = { + fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } /** * This function create a input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue */ - def createQueueStream[T: ClassManifest]( + def queueStream[T: ClassManifest]( queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null @@ -159,9 +159,9 @@ class StreamingContext ( inputStream } - def createQueueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = { + def queueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = { val queue = new Queue[RDD[T]] - val inputStream = createQueueStream(queue, true, null) + val inputStream = queueStream(queue, true, null) queue ++= array inputStream } diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala index 301da56014..d68611abd6 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStream.scala @@ -27,7 +27,7 @@ object FileStream { // Create the FileInputDStream on the directory and use the // stream to count words in new files created - val inputStream = ssc.createTextFileStream(directory.toString) + val inputStream = ssc.textFileStream(directory.toString) val words = inputStream.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala index c725035a8a..df96a811da 100644 --- a/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala +++ b/streaming/src/main/scala/spark/streaming/examples/FileStreamWithCheckpoint.scala @@ -37,7 +37,7 @@ object FileStreamWithCheckpoint { ssc_.setCheckpointDetails(checkpointFile, Seconds(1)) // Setup the streaming computation - val inputStream = ssc_.createTextFileStream(directory.toString) + val inputStream = ssc_.textFileStream(directory.toString) val words = inputStream.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala index ae701bba6d..2af51bad28 100644 --- a/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/streaming/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -24,7 +24,7 @@ object QueueStream { val rddQueue = new SynchronizedQueue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing - val inputStream = ssc.createQueueStream(rddQueue) + val inputStream = ssc.queueStream(rddQueue) val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) reducedStream.print() diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala index 3b86948822..591cb141c3 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountHdfs.scala @@ -16,7 +16,7 @@ object WordCountHdfs { // Create the FileInputDStream on the directory and use the // stream to count words in new files created - val lines = ssc.createTextFileStream(args(1)) + val lines = ssc.textFileStream(args(1)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() |