aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPrashant Sharma <scrapcodes@gmail.com>2013-09-22 08:20:12 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-09-22 08:20:12 +0530
commit276c37a51c9a6188dbbe02754935540ace338dd1 (patch)
tree42f08c5255bf7cb58e06580bd1812573bf487dbc /streaming
parent69fd42aee3f3fed8dbb5f2933413cbf31cac74d1 (diff)
downloadspark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.gz
spark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.bz2
spark-276c37a51c9a6188dbbe02754935540ace338dd1.zip
Akka 2.2 migration
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala7
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
6 files changed, 15 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9e14c8ace7..c722aa15ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
import twitter4j.auth.Authorization
+import akka.util.ByteString
/**
@@ -231,11 +232,11 @@ class StreamingContext private (
def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
): DStream[T] = {
- actorStream(Props(new ZeroMQReceiver(publisherUrl,subscribe,bytesToObjects)),
+ actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
"ZeroMQReceiver", storageLevel, supervisorStrategy)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 8135d2499e..8242af6d5f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -29,6 +29,7 @@ import twitter4j.Status
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
+import akka.util.ByteString
import twitter4j.auth.Authorization
@@ -475,7 +476,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
def zeroMQStream[T](
publisherUrl:String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T],
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
@@ -502,7 +503,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
@@ -522,7 +523,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
index a61a1780f1..394a39fbb0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
@@ -177,7 +177,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
logInfo("Attempting to register with tracker")
val ip = System.getProperty("spark.driver.host", "localhost")
val port = System.getProperty("spark.driver.port", "7077").toInt
- val url = "akka://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
+ val url = "akka.tcp://spark@%s:%s/user/NetworkInputTracker".format(ip, port)
val tracker = env.actorSystem.actorFor(url)
val timeout = 5.seconds
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index c220127c00..ee087a1cf0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -51,7 +51,7 @@ object ReceiverSupervisorStrategy {
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
- * case anything :String ⇒ pushBlock(anything)
+ * case anything :String => pushBlock(anything)
* }
* }
* //Can be plugged in actorStream as follows
@@ -121,7 +121,7 @@ private[streaming] class ActorReceiver[T: ClassTag](
protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
- private class Supervisor extends Actor {
+ class Supervisor extends Actor {
override val supervisorStrategy = receiverSupervisorStrategy
val worker = context.actorOf(props, name)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
index e009325b67..ce8c56fa8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.receivers
import akka.actor.Actor
+import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
@@ -29,7 +30,7 @@ import scala.reflect.ClassTag
*/
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
- bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
@@ -40,10 +41,10 @@ private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
case Connecting ⇒ logInfo("connecting ...")
case m: ZMQMessage ⇒
- logDebug("Received message for:" + m.firstFrameAsString)
+ logDebug("Received message for:" + m.frame(0))
//We ignore first frame for processing as it is the topic
- val bytes = m.frames.tail.map(_.payload)
+ val bytes = m.frames.tail
pushBlock(bytesToObjects(bytes))
case Closed ⇒ logInfo("received closed ")
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..783b8dea31 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -48,7 +48,7 @@ import java.util.*;
import akka.actor.Props;
import akka.zeromq.Subscribe;
-
+import akka.util.ByteString;
// The test suite itself is Serializable so that anonymous Function implementations can be