aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-23 14:40:24 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-23 14:40:24 -0700
commit19191d178d194e4b57094ca868e1cc9c66b8d4a7 (patch)
treeda3e49ec376bab5594cb8cc2212eba7564de45ad /streaming
parenta6de5758f1a48e6c25b441440d8cd84546857326 (diff)
downloadspark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.tar.gz
spark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.tar.bz2
spark-19191d178d194e4b57094ca868e1cc9c66b8d4a7.zip
Renamed the network input streams.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/SocketInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala)2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala63
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/CountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala2
7 files changed, 40 insertions, 35 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
index 89aeeda8b3..4dbf421687 100644
--- a/streaming/src/main/scala/spark/streaming/ObjectInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
@@ -9,7 +9,7 @@ import java.util.concurrent.ArrayBlockingQueue
import scala.collection.mutable.ArrayBuffer
-class ObjectInputDStream[T: ClassManifest](
+class SocketInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 1dc5614a5c..90654cdad9 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -86,21 +86,26 @@ class StreamingContext (
private[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
- def createNetworkTextStream(hostname: String, port: Int): DStream[String] = {
- createNetworkObjectStream[String](hostname, port, ObjectInputReceiver.bytesToLines)
+ def networkTextStream(
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel = StorageLevel.DISK_AND_MEMORY_2
+ ): DStream[String] = {
+ networkStream[String](hostname, port, ObjectInputReceiver.bytesToLines, storageLevel)
}
-
- def createNetworkObjectStream[T: ClassManifest](
- hostname: String,
- port: Int,
- converter: (InputStream) => Iterator[T]
+
+ def networkStream[T: ClassManifest](
+ hostname: String,
+ port: Int,
+ converter: (InputStream) => Iterator[T],
+ storageLevel: StorageLevel
): DStream[T] = {
- val inputStream = new ObjectInputDStream[T](this, hostname, port, converter, StorageLevel.DISK_AND_MEMORY_2)
+ val inputStream = new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
graph.addInputStream(inputStream)
inputStream
}
-
- def createRawNetworkStream[T: ClassManifest](
+
+ def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_2
@@ -109,26 +114,26 @@ class StreamingContext (
graph.addInputStream(inputStream)
inputStream
}
-
+
/*
def createHttpTextStream(url: String): DStream[String] = {
createHttpStream(url, ObjectInputReceiver.bytesToLines)
}
-
+
def createHttpStream[T: ClassManifest](
- url: String,
+ url: String,
converter: (InputStream) => Iterator[T]
): DStream[T] = {
}
*/
- /**
+ /**
* This function creates a input stream that monitors a Hadoop-compatible
* for new files and executes the necessary processing on them.
- */
+ */
def createFileStream[
- K: ClassManifest,
- V: ClassManifest,
+ K: ClassManifest,
+ V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
](directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
@@ -139,13 +144,13 @@ class StreamingContext (
def createTextFileStream(directory: String): DStream[String] = {
createFileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}
-
+
/**
* This function create a input stream from an queue of RDDs. In each batch,
- * it will process either one or all of the RDDs returned by the queue
+ * it will process either one or all of the RDDs returned by the queue
*/
def createQueueStream[T: ClassManifest](
- queue: Queue[RDD[T]],
+ queue: Queue[RDD[T]],
oneAtATime: Boolean = true,
defaultRDD: RDD[T] = null
): DStream[T] = {
@@ -153,7 +158,7 @@ class StreamingContext (
graph.addInputStream(inputStream)
inputStream
}
-
+
def createQueueStream[T: ClassManifest](array: Array[RDD[T]]): DStream[T] = {
val queue = new Queue[RDD[T]]
val inputStream = createQueueStream(queue, true, null)
@@ -172,27 +177,27 @@ class StreamingContext (
/**
* This function registers a DStream as an output stream that will be
* computed every interval.
- */
+ */
def registerOutputStream(outputStream: DStream[_]) {
graph.addOutputStream(outputStream)
}
-
+
def validate() {
assert(graph != null, "Graph is null")
graph.validate()
}
/**
- * This function starts the execution of the streams.
- */
+ * This function starts the execution of the streams.
+ */
def start() {
validate()
val networkInputStreams = graph.getInputStreams().filter(s => s match {
- case n: NetworkInputDStream[_] => true
+ case n: NetworkInputDStream[_] => true
case _ => false
}).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray
-
+
if (networkInputStreams.length > 0) {
// Start the network input tracker (must start before receivers)
networkInputTracker = new NetworkInputTracker(this, networkInputStreams)
@@ -203,9 +208,9 @@ class StreamingContext (
// Start the scheduler
scheduler = new Scheduler(this)
- scheduler.start()
+ scheduler.start()
}
-
+
/**
* This function stops the execution of the streams.
*/
diff --git a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
index ed571d22e3..d2fdabd659 100644
--- a/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/CountRaw.scala
@@ -24,7 +24,7 @@ object CountRaw {
ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
- ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnionDStream(rawStreams)
union.map(_.length + 2).reduce(_ + _).foreachRDD(r => println("Byte count: " + r.collect().mkString))
ssc.start()
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index 6af1c36891..b1e1a613fe 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -24,7 +24,7 @@ object GrepRaw {
ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
- ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnionDStream(rawStreams)
union.filter(_.contains("Culpepper")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index af0a3bf98a..9d1b0b9eb4 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -36,7 +36,7 @@ object TopKWordCountRaw {
moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
- ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnionDStream(rawStreams)
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
index 0aa5294a17..ba1bd1de7c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountNetwork.scala
@@ -16,7 +16,7 @@ object WordCountNetwork {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.createNetworkTextStream(args(1), args(2).toInt)
+ val lines = ssc.networkTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index 98bafec529..d8a0664d7d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -36,7 +36,7 @@ object WordCountRaw {
moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
- ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnionDStream(rawStreams)
val windowedCounts = union.mapPartitions(splitAndCountPartitions)