aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd /examples
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala37
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala13
4 files changed, 44 insertions, 25 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 1a0d5e5854..9437cee2ab 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -77,6 +77,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 2377207779..62e563380a 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
/**
* A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
}
+ @Override
public void onReceive(Object msg) throws Exception {
store((T) msg);
}
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
- * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
- JavaDStream<String> lines = jssc.actorStream(
- Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+ JavaDStream<String> lines = AkkaUtils.createStream(
+ jssc,
+ Props.create(JavaSampleActorReceiver.class, feederActorURI),
+ "SampleReceiver");
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index 88cdc6bc14..8e88987439 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.util.Random
-import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
+import akka.actor._
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.receiver.ActorReceiver
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
-class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends ActorReceiver {
+class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
@@ -108,9 +107,13 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
- val conf = new SparkConf
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
- securityManager = new SecurityManager(conf))._1
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |""".stripMargin)
+ val actorSystem = ActorSystem("test", akkaConf)
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
/**
* A sample word count program demonstrating the use of plugging in
+ *
* Actor as Receiver
* Usage: ActorWordCount <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(2))
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
+ * to ensure the type safety, i.e type of data received and InputDStream
* should be same.
*
- * For example: Both actorStream and SampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
-
- val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
- host, port.toInt))), "SampleReceiver")
+ val lines = AkkaUtils.createStream[String](
+ ssc,
+ Props(classOf[SampleActorReceiver[String]],
+ "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
+ "SampleReceiver")
// compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 9644890576..f612e508eb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import akka.zeromq._
import akka.zeromq.Subscribe
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.zeromq._
@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
*
* To run this example locally, you may run publisher as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
* and run the example as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
// For this stream, a zeroMQ publisher should be running.
- val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+ val lines = ZeroMQUtils.createStream(
+ ssc,
+ url,
+ Subscribe(topic),
+ bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()