diff options
author | Lin Zhao <lin@exabeam.com> | 2016-02-25 12:32:17 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-02-25 12:32:24 -0800 |
commit | fb8bb04766005e8935607069c0155d639f407e8a (patch) | |
tree | e6cd21a7ab2fabc25ddb0078ffa4ba3e6e374cfb /external | |
parent | 751724b1320d38fd94186df3d8f1ca887f21947a (diff) | |
download | spark-fb8bb04766005e8935607069c0155d639f407e8a.tar.gz spark-fb8bb04766005e8935607069c0155d639f407e8a.tar.bz2 spark-fb8bb04766005e8935607069c0155d639f407e8a.zip |
[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever
Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate.
Author: Lin Zhao <lin@exabeam.com>
Closes #11176 from lin-zhao/SPARK-13069.
Diffstat (limited to 'external')
3 files changed, 43 insertions, 1 deletions
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala index c75dc92445..33415c15be 100644 --- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag import akka.actor._ import akka.actor.SupervisorStrategy.{Escalate, Restart} +import akka.pattern.ask +import akka.util.Timeout import com.typesafe.config.ConfigFactory import org.apache.spark.{Logging, TaskContext} @@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor { } /** - * Store a single item of received data to Spark's memory. + * Store a single item of received data to Spark's memory asynchronously. * These single items will be aggregated together into data blocks before * being pushed into Spark's memory. */ def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor { def store[T](item: T) { context.parent ! SingleItemData(item) } + + /** + * Store a single item of received data to Spark's memory and returns a `Future`. + * The `Future` will be completed when the operator finishes, or with an + * `akka.pattern.AskTimeoutException` after the given timeout has expired. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + * + * This method allows the user to control the flow speed using `Future` + */ + def store[T](item: T, timeout: Timeout): Future[Unit] = { + context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher) + } } /** @@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int, /** Case class to receive data sent by child actors */ private[akka] sealed trait ActorReceiverData private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData +private[akka] object Ack extends ActorReceiverData /** * Provides Actors as receivers for receiving stream. @@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag]( store(msg.asInstanceOf[T]) n.incrementAndGet + case AskStoreSingleItemData(msg) => + logDebug("received single sync") + store(msg.asInstanceOf[T]) + n.incrementAndGet + sender() ! Ack + case ByteBufferData(bytes) => logDebug("received bytes") store(bytes) diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java index b732506767..ac5ef31c8b 100644 --- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.SupervisorStrategy; +import akka.util.Timeout; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.junit.Test; @@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver { @Override public void onReceive(Object message) throws Exception { store((String) message); + store((String) message, new Timeout(1000)); } } diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala index f437585a98..ce95d9dd72 100644 --- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.streaming.akka +import scala.concurrent.duration._ + import akka.actor.{Props, SupervisorStrategy} import org.apache.spark.SparkFunSuite @@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite { class TestActor extends ActorReceiver { override def receive: Receive = { case m: String => store(m) + case m => store(m, 10.seconds) } } |