diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 15:18:34 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-18 15:18:34 -0800 |
commit | 12ea14c211da908a278ab19fd1e9f6acd45daae8 (patch) | |
tree | 4f76d48f589f23185b680164cedaa9204af8784d /streaming/src | |
parent | 6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (diff) | |
download | spark-12ea14c211da908a278ab19fd1e9f6acd45daae8.tar.gz spark-12ea14c211da908a278ab19fd1e9f6acd45daae8.tar.bz2 spark-12ea14c211da908a278ab19fd1e9f6acd45daae8.zip |
Changed networkStream to socketStream and pluggableNetworkStream to become networkStream as a way to create streams from arbitrary network receiver.
Diffstat (limited to 'streaming/src')
4 files changed, 22 insertions, 21 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 48d344f055..a426649726 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -146,7 +146,7 @@ class StreamingContext private ( * Create an input stream with any arbitrary user implemented network receiver. * @param receiver Custom implementation of NetworkReceiver */ - def pluggableNetworkStream[T: ClassManifest]( + def networkStream[T: ClassManifest]( receiver: NetworkReceiver[T]): DStream[T] = { val inputStream = new PluggableInputDStream[T](this, receiver) @@ -155,15 +155,16 @@ class StreamingContext private ( } /** - * Create an input stream with any arbitrary user implemented akka actor receiver. + * Create an input stream with any arbitrary user implemented Akka actor receiver. * @param props Props object defining creation of the actor * @param name Name of the actor - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def pluggableActorStream[T: ClassManifest]( + def actorStream[T: ClassManifest]( props: Props, name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = { - pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel))) + networkStream(new ActorReceiver(Settings(props, name, storageLevel))) } /** @@ -174,7 +175,8 @@ class StreamingContext private ( * in its own thread. * @param initialOffsets Optional initial offsets for each of the partitions to consume. * By default the value is pulled from zookeper. - * @param storageLevel RDD storage level. Defaults to memory-only. + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def kafkaStream[T: ClassManifest]( zkQuorum: String, @@ -189,24 +191,24 @@ class StreamingContext private ( } /** - * Create a input stream from network source hostname:port. Data is received using - * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * Create a input stream from TCP source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def networkTextStream( + def socketTextStream( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[String] = { - networkStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) + socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel) } /** - * Create a input stream from network source hostname:port. Data is received using + * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes it interepreted as object using the given * converter. * @param hostname Hostname to connect to for receiving data @@ -215,7 +217,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T: ClassManifest]( + def socketStream[T: ClassManifest]( hostname: String, port: Int, converter: (InputStream) => Iterator[T], diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 03933aae93..d9a676819a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -130,7 +130,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { */ def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { - ssc.networkTextStream(hostname, port, storageLevel) + ssc.socketTextStream(hostname, port, storageLevel) } /** @@ -140,8 +140,8 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data */ - def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { - ssc.networkTextStream(hostname, port) + def socketTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.socketTextStream(hostname, port) } /** @@ -154,7 +154,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects received (after converting bytes to objects) */ - def networkStream[T]( + def socketStream[T]( hostname: String, port: Int, converter: JFunction[InputStream, java.lang.Iterable[T]], @@ -163,7 +163,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def fn = (x: InputStream) => converter.apply(x).toIterator implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.networkStream(hostname, port, fn, storageLevel) + ssc.socketStream(hostname, port, fn, storageLevel) } /** diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 16bacffb92..5d510fd89f 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -23,7 +23,6 @@ import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; import java.io.*; -import java.text.Collator; import java.util.*; // The test suite itself is Serializable so that anonymous Function implementations can be @@ -984,7 +983,7 @@ public class JavaAPISuite implements Serializable { @Test public void testNetworkTextStream() { - JavaDStream test = ssc.networkTextStream("localhost", 12345); + JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1004,7 +1003,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.networkStream( + JavaDStream test = ssc.socketStream( "localhost", 12345, new Converter(), diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 7c1c2e1040..e6aecfbb76 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -41,7 +41,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams val ssc = new StreamingContext(master, framework, batchDuration) - val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) + val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK) val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] val outputStream = new TestOutputStream(networkStream, outputBuffer) def output = outputBuffer.flatMap(x => x) |