aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/streaming-programming-guide.md10
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala12
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala10
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java5
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
9 files changed, 36 insertions, 35 deletions
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index d408e80359..71e1bd4aab 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -34,16 +34,16 @@ The StreamingContext is used to creating InputDStreams from input sources:
{% highlight scala %}
// Assuming ssc is the StreamingContext
-ssc.networkStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
-ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
+ssc.textFileStream(directory) // Creates a stream by monitoring and processing new files in a HDFS directory
+ssc.socketStream(hostname, port) // Creates a stream that uses a TCP socket to read data from hostname:port
{% endhighlight %}
-A complete list of input sources is available in the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext). Data received from these sources can be processed using DStream operations, which are explained next.
+We also provide a input streams for Kafka, Flume, Akka actor, etc. For a complete list of input streams, take a look at the [StreamingContext API documentation](api/streaming/index.html#spark.streaming.StreamingContext).
# DStream Operations
-Once an input DStream has been created, you can transform it using _DStream operators_. Most of these operators return new DStreams which you can further transform. Eventually, you'll need to call an _output operator_, which forces evaluation of the DStream by writing data out to an external source.
+Data received from the input streams can be processed using _DStream operations_. There are two kinds of operations - _transformations_ and _output operations_. Similar to RDD transformations, DStream transformations operate on one or more DStreams to create new DStreams with transformed data. After applying a sequence of transformations to the input streams, you'll need to call the output operations, which writies data out to an external source.
## Transformations
@@ -452,4 +452,4 @@ If the driver had crashed in the middle of the processing of time 3, then it wil
# Where to Go from Here
* Documentation - [Scala and Java](api/streaming/index.html)
-* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples) \ No newline at end of file
+* More examples - [Scala](https://github.com/mesos/spark/tree/master/examples/src/main/scala/spark/streaming/examples) and [Java](https://github.com/mesos/spark/tree/master/examples/src/main/java/spark/streaming/examples)
diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 4299febfd6..07342beb02 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -35,7 +35,7 @@ public class JavaNetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
diff --git a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
index ff05842c71..553afc2024 100644
--- a/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/AkkaActorWordCount.scala
@@ -36,8 +36,8 @@ class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
}
/**
- * A sample word count program demonstrating the use of plugging in
- * AkkaActor as Receiver
+ * A sample word count program demonstrating the use of Akka actor stream.
+ *
*/
object AkkaActorWordCount {
def main(args: Array[String]) {
@@ -56,18 +56,18 @@ object AkkaActorWordCount {
Seconds(batchDuration.toLong))
/*
- * Following is the use of pluggableActorStream to plug in custom actor as receiver
+ * Following is the use of actorStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and PluggableInputDstream
+ * to ensure the type safety, i.e type of data received and actorStream
* should be same.
*
- * For example: Both pluggableActorStream and SampleActorReceiver are parameterized
+ * For example: Both actorStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
- val lines = ssc.pluggableActorStream[String](
+ val lines = ssc.actorStream[String](
Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
remoteAkkaHost, remoteAkkaPort.toInt))), "SampleReceiver")
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 32f7d57bea..7ff70ae2e5 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -27,7 +27,7 @@ object NetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.networkTextStream(args(1), args(2).toInt)
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 60f228b8ad..fba72519a9 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -27,7 +27,7 @@ object PageViewStream {
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
- val pageViews = ssc.networkTextStream(host, port)
+ val pageViews = ssc.socketTextStream(host, port)
.flatMap(_.split("\n"))
.map(PageView.fromString(_))
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 48d344f055..a426649726 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -146,7 +146,7 @@ class StreamingContext private (
* Create an input stream with any arbitrary user implemented network receiver.
* @param receiver Custom implementation of NetworkReceiver
*/
- def pluggableNetworkStream[T: ClassManifest](
+ def networkStream[T: ClassManifest](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
@@ -155,15 +155,16 @@ class StreamingContext private (
}
/**
- * Create an input stream with any arbitrary user implemented akka actor receiver.
+ * Create an input stream with any arbitrary user implemented Akka actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def pluggableActorStream[T: ClassManifest](
+ def actorStream[T: ClassManifest](
props: Props, name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
- pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+ networkStream(new ActorReceiver(Settings(props, name, storageLevel)))
}
/**
@@ -174,7 +175,8 @@ class StreamingContext private (
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream[T: ClassManifest](
zkQuorum: String,
@@ -189,24 +191,24 @@ class StreamingContext private (
}
/**
- * Create a input stream from network source hostname:port. Data is received using
- * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * Create a input stream from TCP source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def networkTextStream(
+ def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
- networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
+ socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -215,7 +217,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T: ClassManifest](
+ def socketStream[T: ClassManifest](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 03933aae93..d9a676819a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
- ssc.networkTextStream(hostname, port, storageLevel)
+ ssc.socketTextStream(hostname, port, storageLevel)
}
/**
@@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
- def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
- ssc.networkTextStream(hostname, port)
+ def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.socketTextStream(hostname, port)
}
/**
@@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T](
+ def socketStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
@@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.networkStream(hostname, port, fn, storageLevel)
+ ssc.socketStream(hostname, port, fn, storageLevel)
}
/**
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 16bacffb92..5d510fd89f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,7 +23,6 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
-import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -984,7 +983,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testNetworkTextStream() {
- JavaDStream test = ssc.networkTextStream("localhost", 12345);
+ JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1004,7 +1003,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.networkStream(
+ JavaDStream test = ssc.socketStream(
"localhost",
12345,
new Converter(),
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 7c1c2e1040..e6aecfbb76 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -41,7 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)