From 41285eaae3642b73b3ac5007a35cc4e8f1d7d084 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Sat, 23 Feb 2013 16:25:07 -0800 Subject: Fixed differences in APIs of StreamingContext and JavaStreamingContext. Change rawNetworkStream to rawSocketStream, and added twitter, actor, zeroMQ streams to JavaStreamingContext. Also added them to JavaAPISuite. --- .../spark/streaming/examples/RawNetworkGrep.scala | 2 +- .../scala/spark/streaming/StreamingContext.scala | 16 +- .../streaming/api/java/JavaStreamingContext.scala | 201 +++++++++++++++++++-- .../test/java/spark/streaming/JavaAPISuite.java | 37 +++- .../scala/spark/streaming/InputStreamsSuite.scala | 2 +- 5 files changed, 232 insertions(+), 26 deletions(-) diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala index 2eec777c54..66e709b7a3 100644 --- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -37,7 +37,7 @@ object RawNetworkGrep { RawTextHelper.warmUp(ssc.sc) val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray + ssc.rawSocketStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = ssc.union(rawStreams) union.filter(_.contains("the")).count().foreach(r => println("Grep count: " + r.collect().mkString)) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index d0430b3f3e..25c67b279b 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -170,7 +170,8 @@ class StreamingContext private ( * should be same. */ def actorStream[T: ClassManifest]( - props: Props, name: String, + props: Props, + name: String, storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) @@ -179,19 +180,20 @@ class StreamingContext private ( /** * Create an input stream that receives messages pushed by a zeromq publisher. * @param publisherUrl Url of remote zeromq publisher - * @param zeroMQ topic to subscribe to + * @param subscribe topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ - def zeroMQStream[T: ClassManifest](publisherUrl:String, + def zeroMQStream[T: ClassManifest]( + publisherUrl:String, subscribe: Subscribe, bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { - + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } @@ -283,7 +285,7 @@ class StreamingContext private ( * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T: ClassManifest]( + def rawSocketStream[T: ClassManifest]( hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 @@ -352,7 +354,7 @@ class StreamingContext private ( def twitterStream( username: String, password: String, - filters: Seq[String], + filters: Seq[String] = Nil, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[Status] = { val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index d2a0ba725f..f3b40b5b88 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -1,16 +1,26 @@ package spark.streaming.api.java -import scala.collection.JavaConversions._ -import java.lang.{Long => JLong, Integer => JInt} - import spark.streaming._ -import dstream._ +import receivers.{ActorReceiver, ReceiverSupervisorStrategy} +import spark.streaming.dstream._ import spark.storage.StorageLevel + import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.api.java.{JavaSparkContext, JavaRDD} + import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + +import twitter4j.Status + +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.zeromq.Subscribe + +import scala.collection.JavaConversions._ + +import java.lang.{Long => JLong, Integer => JInt} import java.io.InputStream import java.util.{Map => JMap} -import spark.api.java.{JavaSparkContext, JavaRDD} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -128,7 +138,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel) : JavaDStream[String] = { ssc.socketTextStream(hostname, port, storageLevel) } @@ -186,13 +196,13 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel Storage level to use for storing the received objects * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T]( + def rawSocketStream[T]( hostname: String, port: Int, storageLevel: StorageLevel): JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel)) + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel)) } /** @@ -204,10 +214,10 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param port Port to connect to for receiving data * @tparam T Type of the objects in the received blocks */ - def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = { + def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port)) + JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port)) } /** @@ -246,11 +256,178 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param hostname Hostname of the slave machine to which the flume data will be sent * @param port Port of the slave machine to which the flume data will be sent */ - def flumeStream(hostname: String, port: Int): - JavaDStream[SparkFlumeEvent] = { + def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = { ssc.flumeStream(hostname, port) } + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + * @param storageLevel Storage level to use for storing the received objects + */ + def twitterStream( + username: String, + password: String, + filters: Array[String], + storageLevel: StorageLevel + ): JavaDStream[Status] = { + ssc.twitterStream(username, password, filters, storageLevel) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + * @param filters Set of filter strings to get only those tweets that match them + */ + def twitterStream( + username: String, + password: String, + filters: Array[String] + ): JavaDStream[Status] = { + ssc.twitterStream(username, password, filters) + } + + /** + * Create a input stream that returns tweets received from Twitter. + * @param username Twitter username + * @param password Twitter password + */ + def twitterStream( + username: String, + password: String + ): JavaDStream[Status] = { + ssc.twitterStream(username, password) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String, + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name, storageLevel) + } + + /** + * Create an input stream with any arbitrary user implemented actor receiver. + * @param props Props object defining creation of the actor + * @param name Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e parametrized type of data received and actorStream + * should be same. + */ + def actorStream[T]( + props: Props, + name: String + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.actorStream[T](props, name) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel Storage level to use for storing the received objects + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + storageLevel: StorageLevel, + supervisorStrategy: SupervisorStrategy + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to memory-only. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], + storageLevel: StorageLevel + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + } + + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence + * of byte thus it needs the converter(which might be deserializer of bytes) + * to translate from sequence of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + */ + def zeroMQStream[T]( + publisherUrl:String, + subscribe: Subscribe, + bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] + ): JavaDStream[T] = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + } + /** * Registers an output stream that will be computed every interval */ diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 4530af5f6a..3bed500f73 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; import spark.streaming.dstream.KafkaPartitionKey; +import spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; +import akka.actor.Props; +import akka.zeromq.Subscribe; + + + // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. @@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable { } @Test - public void testNetworkTextStream() { + public void testSocketTextStream() { JavaDStream test = ssc.socketTextStream("localhost", 12345); } @Test - public void testNetworkString() { + public void testSocketString() { class Converter extends Function> { public Iterable call(InputStream in) { BufferedReader reader = new BufferedReader(new InputStreamReader(in)); @@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable { } @Test - public void testRawNetworkStream() { - JavaDStream test = ssc.rawNetworkStream("localhost", 12345); + public void testRawSocketStream() { + JavaDStream test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345); + JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test @@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable { JavaPairDStream foo = ssc.fileStream("/tmp/foo"); } + + @Test + public void testTwitterStream() { + String[] filters = new String[] { "good", "bad", "ugly" }; + JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testActorStream() { + JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testZeroMQStream() { + JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function>() { + @Override + public Iterable call(byte[][] b) throws Exception { + return null; + } + }); + } } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index c9f941c5b8..1024d3ac97 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { } - test("network input stream") { + test("socket input stream") { // Start the server val testServer = new TestServer(testPort) testServer.start() -- cgit v1.2.3