diff options
author | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-22 08:20:12 +0530 |
---|---|---|
committer | Prashant Sharma <scrapcodes@gmail.com> | 2013-09-22 08:20:12 +0530 |
commit | 276c37a51c9a6188dbbe02754935540ace338dd1 (patch) | |
tree | 42f08c5255bf7cb58e06580bd1812573bf487dbc /examples | |
parent | 69fd42aee3f3fed8dbb5f2933413cbf31cac74d1 (diff) | |
download | spark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.gz spark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.bz2 spark-276c37a51c9a6188dbbe02754935540ace338dd1.zip |
Akka 2.2 migration
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala | 2 | ||||
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala | 8 |
2 files changed, 6 insertions, 4 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 13aa24fa1a..08e399f9ee 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -165,7 +165,7 @@ object ActorWordCount { */ val lines = ssc.actorStream[String]( - Props(new SampleActorReceiver[String]("akka://test@%s:%s/user/FeederActor".format( + Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format( host, port.toInt))), "SampleReceiver") //compute wordcount diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index c8743b9e25..e83ce78aa5 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -23,6 +23,7 @@ import akka.zeromq._ import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe +import akka.util.ByteString /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages @@ -40,10 +41,11 @@ object SimpleZeroMQPublisher { val acs: ActorSystem = ActorSystem() val pubSocket = ZeroMQExtension(acs).newSocket(SocketType.Pub, Bind(url)) - val messages: Array[String] = Array("words ", "may ", "count ") + implicit def stringToByteString(x: String) = ByteString(x) + val messages: List[ByteString] = List("words ", "may ", "count ") while (true) { Thread.sleep(1000) - pubSocket ! ZMQMessage(Frame(topic) :: messages.map(x => Frame(x.getBytes)).toList) + pubSocket ! ZMQMessage(ByteString(topic) :: messages) } acs.awaitTermination() } @@ -78,7 +80,7 @@ object ZeroMQWordCount { val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) - def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator + def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator //For this stream, a zeroMQ publisher should be running. val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) |