aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-23 16:25:07 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-23 16:25:07 -0800
commit41285eaae3642b73b3ac5007a35cc4e8f1d7d084 (patch)
tree7347664e7474271a1ca534b95c8b731f28045082 /streaming
parentd8cee52d526497efdc02ea39e2fb721321ec0b4e (diff)
downloadspark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.gz
spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.tar.bz2
spark-41285eaae3642b73b3ac5007a35cc4e8f1d7d084.zip
Fixed differences in APIs of StreamingContext and JavaStreamingContext. Change rawNetworkStream to rawSocketStream, and added twitter, actor, zeroMQ streams to JavaStreamingContext. Also added them to JavaAPISuite.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala16
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala201
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java37
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala2
4 files changed, 231 insertions, 25 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d0430b3f3e..25c67b279b 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -170,7 +170,8 @@ class StreamingContext private (
* should be same.
*/
def actorStream[T: ClassManifest](
- props: Props, name: String,
+ props: Props,
+ name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
@@ -179,19 +180,20 @@ class StreamingContext private (
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
- * @param zeroMQ topic to subscribe to
+ * @param subscribe topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
- def zeroMQStream[T: ClassManifest](publisherUrl:String,
+ def zeroMQStream[T: ClassManifest](
+ publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
-
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
@@ -283,7 +285,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T: ClassManifest](
+ def rawSocketStream[T: ClassManifest](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@@ -352,7 +354,7 @@ class StreamingContext private (
def twitterStream(
username: String,
password: String,
- filters: Seq[String],
+ filters: Seq[String] = Nil,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[Status] = {
val inputStream = new TwitterInputDStream(this, username, password, filters, storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index d2a0ba725f..f3b40b5b88 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -1,16 +1,26 @@
package spark.streaming.api.java
-import scala.collection.JavaConversions._
-import java.lang.{Long => JLong, Integer => JInt}
-
import spark.streaming._
-import dstream._
+import receivers.{ActorReceiver, ReceiverSupervisorStrategy}
+import spark.streaming.dstream._
import spark.storage.StorageLevel
+
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.api.java.{JavaSparkContext, JavaRDD}
+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
+import twitter4j.Status
+
+import akka.actor.Props
+import akka.actor.SupervisorStrategy
+import akka.zeromq.Subscribe
+
+import scala.collection.JavaConversions._
+
+import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap}
-import spark.api.java.{JavaSparkContext, JavaRDD}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -128,7 +138,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+ def socketTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
ssc.socketTextStream(hostname, port, storageLevel)
}
@@ -186,13 +196,13 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T](
+ def rawSocketStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
}
/**
@@ -204,10 +214,10 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
- def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
+ JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
}
/**
@@ -246,12 +256,179 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
- def flumeStream(hostname: String, port: Int):
- JavaDStream[SparkFlumeEvent] = {
+ def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
/**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Array[String],
+ storageLevel: StorageLevel
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password, filters, storageLevel)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ * @param filters Set of filter strings to get only those tweets that match them
+ */
+ def twitterStream(
+ username: String,
+ password: String,
+ filters: Array[String]
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password, filters)
+ }
+
+ /**
+ * Create a input stream that returns tweets received from Twitter.
+ * @param username Twitter username
+ * @param password Twitter password
+ */
+ def twitterStream(
+ username: String,
+ password: String
+ ): JavaDStream[Status] = {
+ ssc.twitterStream(username, password)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name, storageLevel)
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e parametrized type of data received and actorStream
+ * should be same.
+ */
+ def actorStream[T](
+ props: Props,
+ name: String
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.actorStream[T](props, name)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ storageLevel: StorageLevel,
+ supervisorStrategy: SupervisorStrategy
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
+ storageLevel: StorageLevel
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ }
+
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
+ * of byte thus it needs the converter(which might be deserializer of bytes)
+ * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ */
+ def zeroMQStream[T](
+ publisherUrl:String,
+ subscribe: Subscribe,
+ bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
+ ): JavaDStream[T] = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ }
+
+ /**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _, _]) {
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 4530af5f6a..3bed500f73 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -24,10 +24,16 @@ import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
import spark.streaming.dstream.KafkaPartitionKey;
+import spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
+import akka.actor.Props;
+import akka.zeromq.Subscribe;
+
+
+
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
@@ -1205,12 +1211,12 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testNetworkTextStream() {
+ public void testSocketTextStream() {
JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
- public void testNetworkString() {
+ public void testSocketString() {
class Converter extends Function<InputStream, Iterable<String>> {
public Iterable<String> call(InputStream in) {
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
@@ -1239,13 +1245,13 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testRawNetworkStream() {
- JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
+ public void testRawSocketStream() {
+ JavaDStream test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345);
+ JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
@@ -1253,4 +1259,25 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, String> foo =
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}
+
+ @Test
+ public void testTwitterStream() {
+ String[] filters = new String[] { "good", "bad", "ugly" };
+ JavaDStream test = ssc.twitterStream("username", "password", filters, StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testActorStream() {
+ JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testZeroMQStream() {
+ JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ @Override
+ public Iterable<String> call(byte[][] b) throws Exception {
+ return null;
+ }
+ });
+ }
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index c9f941c5b8..1024d3ac97 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -42,7 +42,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
- test("network input stream") {
+ test("socket input stream") {
// Start the server
val testServer = new TestServer(testPort)
testServer.start()