From 276c37a51c9a6188dbbe02754935540ace338dd1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Sun, 22 Sep 2013 08:20:12 +0530 Subject: Akka 2.2 migration --- .../main/scala/org/apache/spark/streaming/StreamingContext.scala | 5 +++-- .../org/apache/spark/streaming/api/java/JavaStreamingContext.scala | 7 ++++--- .../org/apache/spark/streaming/dstream/NetworkInputDStream.scala | 2 +- .../scala/org/apache/spark/streaming/receivers/ActorReceiver.scala | 4 ++-- .../org/apache/spark/streaming/receivers/ZeroMQReceiver.scala | 7 ++++--- .../src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 2 +- 6 files changed, 15 insertions(+), 12 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9e14c8ace7..c722aa15ab 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status import twitter4j.auth.Authorization +import akka.util.ByteString /** @@ -231,11 +232,11 @@ class StreamingContext private ( def zeroMQStream[T: ClassTag]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy ): DStream[T] = { - actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)), + actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), "ZeroMQReceiver", storageLevel, supervisorStrategy) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 8135d2499e..8242af6d5f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -29,6 +29,7 @@ import twitter4j.Status import akka.actor.Props import akka.actor.SupervisorStrategy import akka.zeromq.Subscribe +import akka.util.ByteString import twitter4j.auth.Authorization @@ -475,7 +476,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { def zeroMQStream[T]( publisherUrl:String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T], + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], storageLevel: StorageLevel, supervisorStrategy: SupervisorStrategy ): JavaDStream[T] = { @@ -502,7 +503,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) } @@ -522,7 +523,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator ssc.zeroMQStream[T](publisherUrl, subscribe, fn) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index a61a1780f1..394a39fbb0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -177,7 +177,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging logInfo("Attempting to register with tracker") val ip = System.getProperty("spark.driver.host", "localhost") val port = System.getProperty("spark.driver.port", "7077").toInt - val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port) + val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port) val tracker = env.actorSystem.actorFor(url) val timeout = 5.seconds diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala index c220127c00..ee087a1cf0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala @@ -51,7 +51,7 @@ object ReceiverSupervisorStrategy { * @example {{{ * class MyActor extends Actor with Receiver{ * def receive { - * case anything :String ⇒ pushBlock(anything) + * case anything :String => pushBlock(anything) * } * } * //Can be plugged in actorStream as follows @@ -121,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassTag]( protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor), "Supervisor" + streamId) - private class Supervisor extends Actor { + class Supervisor extends Actor { override val supervisorStrategy = receiverSupervisorStrategy val worker = context.actorOf(props, name) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala index e009325b67..ce8c56fa8a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.receivers import akka.actor.Actor +import akka.util.ByteString import akka.zeromq._ import org.apache.spark.Logging @@ -29,7 +30,7 @@ import scala.reflect.ClassTag */ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, subscribe: Subscribe, - bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) + bytesToObjects: Seq[ByteString] ⇒ Iterator[T]) extends Actor with Receiver with Logging { override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), @@ -40,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String, case Connecting ⇒ logInfo("connecting ...") case m: ZMQMessage ⇒ - logDebug("Received message for:" + m.firstFrameAsString) + logDebug("Received message for:" + m.frame(0)) //We ignore first frame for processing as it is the topic - val bytes = m.frames.tail.map(_.payload) + val bytes = m.frames.tail pushBlock(bytesToObjects(bytes)) case Closed ⇒ logInfo("received closed ") diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index c0d729ff87..783b8dea31 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -48,7 +48,7 @@ import java.util.*; import akka.actor.Props; import akka.zeromq.Subscribe; - +import akka.util.ByteString; // The test suite itself is Serializable so that anonymous Function implementations can be -- cgit v1.2.3