diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-23 14:40:24 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-23 14:40:24 -0700 |
commit | 19191d178d194e4b57094ca868e1cc9c66b8d4a7 (patch) | |
tree | da3e49ec376bab5594cb8cc2212eba7564de45ad /streaming | |
parent | a6de5758f1a48e6c25b441440d8cd84546857326 (diff) | |
download | spark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.tar.gz spark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.tar.bz2 spark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.zip |
Renamed the network input streams.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala) | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/StreamingContext.scala | 63 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/CountRaw.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala | 2 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala | 2 |
7 files changed, 40 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala index 89aeeda8b3..4dbf421687 100644 --- a/streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala @@ -9,7 +9,7 @@ import java.util.concurrent.ArrayBlockingQueue import scala.collection.mutable.ArrayBuffer -class ObjectInputDStream[T: ClassManifest]( +class SocketInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, host: String, port: Int, diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 1dc5614a5c..90654cdad9 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -86,21 +86,26 @@ class StreamingContext ( private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement() - def createNetworkTextStream(hostname: String, port: Int): DStream[String] = { - createNetworkObjectStream[String](hostname, port, ObjectInputReceiver.bytesToLines) + def networkTextStream( + hostname: String, + port: Int, + storageLevel: StorageLevel = StorageLevel.DISK_AND_MEMORY_2 + ): DStream[String] = { + networkStream[String](hostname, port, ObjectInputReceiver.bytesToLines, storageLevel) } - - def createNetworkObjectStream[T: ClassManifest]( - hostname: String, - port: Int, - converter: (InputStream) => Iterator[T] + + def networkStream[T: ClassManifest]( + hostname: String, + port: Int, + converter: (InputStream) => Iterator[T], + storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new ObjectInputDStream[T](this, hostname, port, converter, StorageLevel.DISK_AND_MEMORY_2) + val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel) graph.addInputStream(inputStream) inputStream } - - def createRawNetworkStream[T: ClassManifest]( + + def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2 @@ -109,26 +114,26 @@ class StreamingContext ( graph.addInputStream(inputStream) inputStream } - + /* def createHttpTextStream(url: String): DStream[String] = { createHttpStream(url, ObjectInputReceiver.bytesToLines) } - + def createHttpStream[T: ClassManifest]( - url: String, + url: String, converter: (InputStream) => Iterator[T] ): DStream[T] = { } */ - /** + /** * This function creates a input stream that monitors a Hadoop-compatible * for new files and executes the necessary processing on them. - */ + */ def createFileStream[ - K: ClassManifest, - V: ClassManifest, + K: ClassManifest, + V: ClassManifest, F <: NewInputFormat[K, V]: ClassManifest ](directory: String): DStream[(K, V)] = { val inputStream = new FileInputDStream[K, V, F](this, directory) @@ -139,13 +144,13 @@ class StreamingContext ( def createTextFileStream(directory: String): DStream[String] = { createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString) } - + /** * This function create a input stream from an queue of RDDs. In each batch, - * it will process either one or all of the RDDs returned by the queue + * it will process either one or all of the RDDs returned by the queue */ def createQueueStream[T: ClassManifest]( - queue: Queue[RDD[T]], + queue: Queue[RDD[T]], oneAtATime: Boolean = true, defaultRDD: RDD[T] = null ): DStream[T] = { @@ -153,7 +158,7 @@ class StreamingContext ( graph.addInputStream(inputStream) inputStream } - + def createQueueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = { val queue = new Queue[RDD[T]] val inputStream = createQueueStream(queue, true, null) @@ -172,27 +177,27 @@ class StreamingContext ( /** * This function registers a DStream as an output stream that will be * computed every interval. - */ + */ def registerOutputStream(outputStream: DStream[_]) { graph.addOutputStream(outputStream) } - + def validate() { assert(graph != null, "Graph is null") graph.validate() } /** - * This function starts the execution of the streams. - */ + * This function starts the execution of the streams. + */ def start() { validate() val networkInputStreams = graph.getInputStreams().filter(s => s match { - case n: NetworkInputDStream[_] => true + case n: NetworkInputDStream[_] => true case _ => false }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray - + if (networkInputStreams.length > 0) { // Start the network input tracker (must start before receivers) networkInputTracker = new NetworkInputTracker(this, networkInputStreams) @@ -203,9 +208,9 @@ class StreamingContext ( // Start the scheduler scheduler = new Scheduler(this) - scheduler.start() + scheduler.start() } - + /** * This function stops the execution of the streams. */ diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala index ed571d22e3..d2fdabd659 100644 --- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala @@ -24,7 +24,7 @@ object CountRaw { ssc.sc.parallelize(1 to 1000, 1000).count() val rawStreams = (1 to numStreams).map(_ => - ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnionDStream(rawStreams) union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString)) ssc.start() diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index 6af1c36891..b1e1a613fe 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -24,7 +24,7 @@ object GrepRaw { ssc.sc.parallelize(1 to 1000, 1000).count() val rawStreams = (1 to numStreams).map(_ => - ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnionDStream(rawStreams) union.filter(_.contains("Culpepper")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index af0a3bf98a..9d1b0b9eb4 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -36,7 +36,7 @@ object TopKWordCountRaw { moreWarmup(ssc.sc) val rawStreams = (1 to streams).map(_ => - ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnionDStream(rawStreams) val windowedCounts = union.mapPartitions(splitAndCountPartitions) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala index 0aa5294a17..ba1bd1de7c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala @@ -16,7 +16,7 @@ object WordCountNetwork { // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') - val lines = ssc.createNetworkTextStream(args(1), args(2).toInt) + val lines = ssc.networkTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index 98bafec529..d8a0664d7d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -36,7 +36,7 @@ object WordCountRaw { moreWarmup(ssc.sc) val rawStreams = (1 to streams).map(_ => - ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnionDStream(rawStreams) val windowedCounts = union.mapPartitions(splitAndCountPartitions) |