diff options
Diffstat (limited to 'examples/src')
3 files changed, 39 insertions, 25 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java index 2377207779..62e563380a 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java @@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.receiver.JavaActorReceiver; +import org.apache.spark.streaming.akka.AkkaUtils; +import org.apache.spark.streaming.akka.JavaActorReceiver; /** * A sample actor as receiver, is also simplest. This receiver actor @@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver { remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf()); } + @Override public void onReceive(Object msg) throws Exception { store((T) msg); } @@ -100,18 +102,20 @@ public class JavaActorWordCount { String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor"; /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility * to ensure the type safety, i.e type of data received and InputDstream * should be same. * - * For example: Both actorStream and JavaSampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized * to same type to ensure type safety. */ - JavaDStream<String> lines = jssc.actorStream( - Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver"); + JavaDStream<String> lines = AkkaUtils.createStream( + jssc, + Props.create(JavaSampleActorReceiver.class, feederActorURI), + "SampleReceiver"); // compute wordcount lines.flatMap(new FlatMapFunction<String, String>() { diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 88cdc6bc14..8e88987439 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList import scala.reflect.ClassTag import scala.util.Random -import akka.actor.{actorRef2Scala, Actor, ActorRef, Props} +import akka.actor._ +import com.typesafe.config.ConfigFactory -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.receiver.ActorReceiver -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -78,8 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends ActorReceiver { +class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -108,9 +107,13 @@ object FeederActor { } val Seq(host, port) = args.toSeq - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |""".stripMargin) + val actorSystem = ActorSystem("test", akkaConf) val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) @@ -121,6 +124,7 @@ object FeederActor { /** * A sample word count program demonstrating the use of plugging in + * * Actor as Receiver * Usage: ActorWordCount <hostname> <port> * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. @@ -146,20 +150,21 @@ object ActorWordCount { val ssc = new StreamingContext(sparkConf, Seconds(2)) /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * to ensure the type safety, i.e type of data received and InputDStream * should be same. * - * For example: Both actorStream and SampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + val lines = AkkaUtils.createStream[String]( + ssc, + Props(classOf[SampleActorReceiver[String]], + "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), + "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 9644890576..f612e508eb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala import akka.util.ByteString import akka.zeromq._ import akka.zeromq.Subscribe +import com.typesafe.config.ConfigFactory -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.zeromq._ @@ -69,10 +70,10 @@ object SimpleZeroMQPublisher { * * To run this example locally, you may run publisher as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` * and run the example as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { @@ -90,7 +91,11 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val lines = ZeroMQUtils.createStream( + ssc, + url, + Subscribe(topic), + bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() |