diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-04-24 18:08:26 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-04-24 18:08:26 +0530 |
commit | ad88f083a627ba38e99b1b135a82a1fcfd107444 (patch) | |
tree | bd6571711c8a1983360e4a61b40f1d917f690e17 /streaming/src | |
parent | 185bb9525a3a48313cd5e446e1b80d2d697465d8 (diff) | |
download | spark-ad88f083a627ba38e99b1b135a82a1fcfd107444.tar.gz spark-ad88f083a627ba38e99b1b135a82a1fcfd107444.tar.bz2 spark-ad88f083a627ba38e99b1b135a82a1fcfd107444.zip |
scala 2.10 and master merge
Diffstat (limited to 'streaming/src')
4 files changed, 16 insertions, 15 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index b159d26c02..e5bb654578 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -11,8 +11,8 @@ import scala.collection.mutable.Queue import akka.actor._ import akka.pattern.ask -import akka.util.duration._ -import akka.dispatch._ +import scala.concurrent.duration._ +// import akka.dispatch._ private[streaming] sealed trait NetworkInputTrackerMessage private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) extends NetworkInputTrackerMessage diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963..5347374730 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -7,13 +7,15 @@ import spark.rdd.BlockRDD import spark.storage.StorageLevel import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ import java.nio.ByteBuffer import akka.actor.{Props, Actor} import akka.pattern.ask -import akka.dispatch.Await -import akka.util.duration._ +import scala.concurrent.Await +import akka.util.Timeout + import spark.streaming.util.{RecurringTimer, SystemClock} import java.util.concurrent.ArrayBlockingQueue diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index b3201d0b28..6c9e373de3 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -3,6 +3,8 @@ package spark.streaming.receivers import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy } import akka.actor.{ actorRef2Scala, ActorRef } import akka.actor.{ PossiblyHarmful, OneForOneStrategy } +import akka.actor.SupervisorStrategy._ +import scala.concurrent.duration._ import spark.storage.StorageLevel import spark.streaming.dstream.NetworkReceiver @@ -12,9 +14,6 @@ import java.util.concurrent.atomic.AtomicInteger /** A helper with set of defaults for supervisor strategy **/ object ReceiverSupervisorStrategy { - import akka.util.duration._ - import akka.actor.SupervisorStrategy._ - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 15 millis) { case _: RuntimeException ⇒ Restart @@ -27,10 +26,10 @@ object ReceiverSupervisorStrategy { * pushBlock API. * * @example {{{ - * class MyActor extends Actor with Receiver{ - * def receive { - * case anything :String ⇒ pushBlock(anything) - * } + * class MyActor extends Actor with Receiver{ + * def receive { + * case anything :String ⇒ pushBlock(anything) + * } * } * //Can be plugged in actorStream as follows * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") @@ -74,12 +73,12 @@ private[streaming] case class Data[T: ClassManifest](data: T) * his own Actor to run as receiver for Spark Streaming input source. * * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. - * + * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance]. + * * Here's a way to start more supervisor/workers as its children. * * @example {{{ - * context.parent ! Props(new Supervisor) + * context.parent ! Props(new Supervisor) * }}} OR {{{ * context.parent ! Props(new Worker,"Worker") * }}} diff --git a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala index 5533c3cf1e..e7608f08ae 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ZeroMQReceiver.scala @@ -13,7 +13,7 @@ private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String, bytesToObjects: Seq[Seq[Byte]] ⇒ Iterator[T]) extends Actor with Receiver with Logging { - override def preStart() = context.system.newSocket(SocketType.Sub, Listener(self), + override def preStart() = ZeroMQExtension(context.system).newSocket(SocketType.Sub, Listener(self), Connect(publisherUrl), subscribe) def receive: Receive = { |