aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 15:18:34 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 15:18:34 -0800
commit12ea14c211da908a278ab19fd1e9f6acd45daae8 (patch)
tree4f76d48f589f23185b680164cedaa9204af8784d /streaming
parent6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (diff)
downloadspark-12ea14c211da908a278ab19fd1e9f6acd45daae8.tar.gz
spark-12ea14c211da908a278ab19fd1e9f6acd45daae8.tar.bz2
spark-12ea14c211da908a278ab19fd1e9f6acd45daae8.zip
Changed networkStream to socketStream and pluggableNetworkStream to become networkStream as a way to create streams from arbitrary network receiver.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala26
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala10
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java5
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
4 files changed, 22 insertions, 21 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 48d344f055..a426649726 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -146,7 +146,7 @@ class StreamingContext private (
* Create an input stream with any arbitrary user implemented network receiver.
* @param receiver Custom implementation of NetworkReceiver
*/
- def pluggableNetworkStream[T: ClassManifest](
+ def networkStream[T: ClassManifest](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
@@ -155,15 +155,16 @@ class StreamingContext private (
}
/**
- * Create an input stream with any arbitrary user implemented akka actor receiver.
+ * Create an input stream with any arbitrary user implemented Akka actor receiver.
* @param props Props object defining creation of the actor
* @param name Name of the actor
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def pluggableActorStream[T: ClassManifest](
+ def actorStream[T: ClassManifest](
props: Props, name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
- pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+ networkStream(new ActorReceiver(Settings(props, name, storageLevel)))
}
/**
@@ -174,7 +175,8 @@ class StreamingContext private (
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
- * @param storageLevel RDD storage level. Defaults to memory-only.
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream[T: ClassManifest](
zkQuorum: String,
@@ -189,24 +191,24 @@ class StreamingContext private (
}
/**
- * Create a input stream from network source hostname:port. Data is received using
- * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * Create a input stream from TCP source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def networkTextStream(
+ def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[String] = {
- networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
+ socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -215,7 +217,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T: ClassManifest](
+ def socketStream[T: ClassManifest](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 03933aae93..d9a676819a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
- ssc.networkTextStream(hostname, port, storageLevel)
+ ssc.socketTextStream(hostname, port, storageLevel)
}
/**
@@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
- def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
- ssc.networkTextStream(hostname, port)
+ def socketTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.socketTextStream(hostname, port)
}
/**
@@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
- def networkStream[T](
+ def socketStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
@@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.networkStream(hostname, port, fn, storageLevel)
+ ssc.socketStream(hostname, port, fn, storageLevel)
}
/**
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 16bacffb92..5d510fd89f 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,7 +23,6 @@ import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
import java.io.*;
-import java.text.Collator;
import java.util.*;
// The test suite itself is Serializable so that anonymous Function implementations can be
@@ -984,7 +983,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testNetworkTextStream() {
- JavaDStream test = ssc.networkTextStream("localhost", 12345);
+ JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1004,7 +1003,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.networkStream(
+ JavaDStream test = ssc.socketStream(
"localhost",
12345,
new Converter(),
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 7c1c2e1040..e6aecfbb76 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -41,7 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)