aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd /examples/src/main/java
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java14
1 files changed, 9 insertions, 5 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 2377207779..62e563380a 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
/**
* A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
}
+ @Override
public void onReceive(Object msg) throws Exception {
store((T) msg);
}
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
- * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
- JavaDStream<String> lines = jssc.actorStream(
- Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+ JavaDStream<String> lines = AkkaUtils.createStream(
+ jssc,
+ Props.create(JavaSampleActorReceiver.class, feederActorURI),
+ "SampleReceiver");
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {