diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-20 13:55:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-01-20 13:55:41 -0800 |
commit | b7d74a602f622d8e105b349bd6d17ba42e7668dc (patch) | |
tree | 118deb532942513693e60f851dde638d7fa818cd /examples/src/main/scala | |
parent | 944fdadf77523570f6b33544ad0b388031498952 (diff) | |
download | spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2 spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip |
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes:
1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala | 37 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala | 13 |
2 files changed, 30 insertions, 20 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala index 88cdc6bc14..8e88987439 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala @@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList import scala.reflect.ClassTag import scala.util.Random -import akka.actor.{actorRef2Scala, Actor, ActorRef, Props} +import akka.actor._ +import com.typesafe.config.ConfigFactory -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} -import org.apache.spark.streaming.receiver.ActorReceiver -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils} case class SubscribeReceiver(receiverActor: ActorRef) case class UnsubscribeReceiver(receiverActor: ActorRef) @@ -78,8 +78,7 @@ class FeederActor extends Actor { * * @see [[org.apache.spark.examples.streaming.FeederActor]] */ -class SampleActorReceiver[T: ClassTag](urlOfPublisher: String) -extends ActorReceiver { +class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver { lazy private val remotePublisher = context.actorSelection(urlOfPublisher) @@ -108,9 +107,13 @@ object FeederActor { } val Seq(host, port) = args.toSeq - val conf = new SparkConf - val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf, - securityManager = new SecurityManager(conf))._1 + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |akka.remote.netty.tcp.hostname = "$host" + |akka.remote.netty.tcp.port = $port + |""".stripMargin) + val actorSystem = ActorSystem("test", akkaConf) val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor") println("Feeder started as:" + feeder) @@ -121,6 +124,7 @@ object FeederActor { /** * A sample word count program demonstrating the use of plugging in + * * Actor as Receiver * Usage: ActorWordCount <hostname> <port> * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. @@ -146,20 +150,21 @@ object ActorWordCount { val ssc = new StreamingContext(sparkConf, Seconds(2)) /* - * Following is the use of actorStream to plug in custom actor as receiver + * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver * * An important point to note: * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e type of data received and InputDstream + * to ensure the type safety, i.e type of data received and InputDStream * should be same. * - * For example: Both actorStream and SampleActorReceiver are parameterized + * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized * to same type to ensure type safety. */ - - val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( - host, port.toInt))), "SampleReceiver") + val lines = AkkaUtils.createStream[String]( + ssc, + Props(classOf[SampleActorReceiver[String]], + "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)), + "SampleReceiver") // compute wordcount lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print() diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala index 9644890576..f612e508eb 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala @@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala import akka.util.ByteString import akka.zeromq._ import akka.zeromq.Subscribe +import com.typesafe.config.ConfigFactory -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.zeromq._ @@ -69,10 +70,10 @@ object SimpleZeroMQPublisher { * * To run this example locally, you may run publisher as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo` * and run the example as * `$ bin/run-example \ - * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo` + * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo` */ // scalastyle:on object ZeroMQWordCount { @@ -90,7 +91,11 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator // For this stream, a zeroMQ publisher should be running. - val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) + val lines = ZeroMQUtils.createStream( + ssc, + url, + Subscribe(topic), + bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() |