aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-04-24 18:08:26 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-04-24 18:08:26 +0530
commitad88f083a627ba38e99b1b135a82a1fcfd107444 (patch)
treebd6571711c8a1983360e4a61b40f1d917f690e17 /streaming/src
parent185bb9525a3a48313cd5e446e1b80d2d697465d8 (diff)
downloadspark-ad88f083a627ba38e99b1b135a82a1fcfd107444.tar.gz
spark-ad88f083a627ba38e99b1b135a82a1fcfd107444.tar.bz2
spark-ad88f083a627ba38e99b1b135a82a1fcfd107444.zip
scala 2.10 and master merge
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala19
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala2
4 files changed, 16 insertions, 15 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index b159d26c02..e5bb654578 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -11,8 +11,8 @@ import scala.collection.mutable.Queue
import akka.actor._
import akka.pattern.ask
-import akka.util.duration._
-import akka.dispatch._
+import scala.concurrent.duration._
+// import akka.dispatch._
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 7385474963..5347374730 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -7,13 +7,15 @@ import spark.rdd.BlockRDD
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import java.nio.ByteBuffer
import akka.actor.{Props, Actor}
import akka.pattern.ask
-import akka.dispatch.Await
-import akka.util.duration._
+import scala.concurrent.Await
+import akka.util.Timeout
+
import spark.streaming.util.{RecurringTimer, SystemClock}
import java.util.concurrent.ArrayBlockingQueue
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index b3201d0b28..6c9e373de3 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -3,6 +3,8 @@ package spark.streaming.receivers
import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+import akka.actor.SupervisorStrategy._
+import scala.concurrent.duration._
import spark.storage.StorageLevel
import spark.streaming.dstream.NetworkReceiver
@@ -12,9 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
- import akka.util.duration._
- import akka.actor.SupervisorStrategy._
-
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException ⇒ Restart
@@ -27,10 +26,10 @@ object ReceiverSupervisorStrategy {
* pushBlock API.
*
* @example {{{
- * class MyActor extends Actor with Receiver{
- * def receive {
- * case anything :String ⇒ pushBlock(anything)
- * }
+ * class MyActor extends Actor with Receiver{
+ * def receive {
+ * case anything :String ⇒ pushBlock(anything)
+ * }
* }
* //Can be plugged in actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
@@ -74,12 +73,12 @@ private[streaming] case class Data[T: ClassManifest](data: T)
* his own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
- *
+ * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ *
* Here's a way to start more supervisor/workers as its children.
*
* @example {{{
- * context.parent ! Props(new Supervisor)
+ * context.parent ! Props(new Supervisor)
* }}} OR {{{
* context.parent ! Props(new Worker,"Worker")
* }}}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
index 5533c3cf1e..e7608f08ae 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala
@@ -13,7 +13,7 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T])
extends Actor with Receiver with Logging {
- override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self),
+ override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self),
Connect(publisherUrl), subscribe)
def receive: Receive = {