aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorLin Zhao <lin@exabeam.com>2016-02-25 12:32:17 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-25 12:32:24 -0800
commitfb8bb04766005e8935607069c0155d639f407e8a (patch)
treee6cd21a7ab2fabc25ddb0078ffa4ba3e6e374cfb /external
parent751724b1320d38fd94186df3d8f1ca887f21947a (diff)
downloadspark-fb8bb04766005e8935607069c0155d639f407e8a.tar.gz
spark-fb8bb04766005e8935607069c0155d639f407e8a.tar.bz2
spark-fb8bb04766005e8935607069c0155d639f407e8a.zip
[SPARK-13069][STREAMING] Add "ask" style store() to ActorReciever
Introduces a "ask" style ```store``` in ```ActorReceiver``` as a way to allow actor receiver blocked by back pressure or maxRate. Author: Lin Zhao <lin@exabeam.com> Closes #11176 from lin-zhao/SPARK-13069.
Diffstat (limited to 'external')
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala39
-rw-r--r--external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java2
-rw-r--r--external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala3
3 files changed, 43 insertions, 1 deletions
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index c75dc92445..33415c15be 100644
--- a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -20,12 +20,15 @@ package org.apache.spark.streaming.akka
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
+import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import akka.pattern.ask
+import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.apache.spark.{Logging, TaskContext}
@@ -105,13 +108,26 @@ abstract class ActorReceiver extends Actor {
}
/**
- * Store a single item of received data to Spark's memory.
+ * Store a single item of received data to Spark's memory asynchronously.
* These single items will be aggregated together into data blocks before
* being pushed into Spark's memory.
*/
def store[T](item: T) {
context.parent ! SingleItemData(item)
}
+
+ /**
+ * Store a single item of received data to Spark's memory and returns a `Future`.
+ * The `Future` will be completed when the operator finishes, or with an
+ * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ *
+ * This method allows the user to control the flow speed using `Future`
+ */
+ def store[T](item: T, timeout: Timeout): Future[Unit] = {
+ context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+ }
}
/**
@@ -162,6 +178,19 @@ abstract class JavaActorReceiver extends UntypedActor {
def store[T](item: T) {
context.parent ! SingleItemData(item)
}
+
+ /**
+ * Store a single item of received data to Spark's memory and returns a `Future`.
+ * The `Future` will be completed when the operator finishes, or with an
+ * `akka.pattern.AskTimeoutException` after the given timeout has expired.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ *
+ * This method allows the user to control the flow speed using `Future`
+ */
+ def store[T](item: T, timeout: Timeout): Future[Unit] = {
+ context.parent.ask(AskStoreSingleItemData(item))(timeout).map(_ => ())(context.dispatcher)
+ }
}
/**
@@ -179,8 +208,10 @@ case class Statistics(numberOfMsgs: Int,
/** Case class to receive data sent by child actors */
private[akka] sealed trait ActorReceiverData
private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class AskStoreSingleItemData[T](item: T) extends ActorReceiverData
private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] object Ack extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@@ -233,6 +264,12 @@ private[akka] class ActorReceiverSupervisor[T: ClassTag](
store(msg.asInstanceOf[T])
n.incrementAndGet
+ case AskStoreSingleItemData(msg) =>
+ logDebug("received single sync")
+ store(msg.asInstanceOf[T])
+ n.incrementAndGet
+ sender() ! Ack
+
case ByteBufferData(bytes) =>
logDebug("received bytes")
store(bytes)
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
index b732506767..ac5ef31c8b 100644
--- a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
+++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.akka;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
+import akka.util.Timeout;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.junit.Test;
@@ -62,5 +63,6 @@ class JavaTestActor extends JavaActorReceiver {
@Override
public void onReceive(Object message) throws Exception {
store((String) message);
+ store((String) message, new Timeout(1000));
}
}
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
index f437585a98..ce95d9dd72 100644
--- a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
+++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.streaming.akka
+import scala.concurrent.duration._
+
import akka.actor.{Props, SupervisorStrategy}
import org.apache.spark.SparkFunSuite
@@ -60,5 +62,6 @@ class AkkaUtilsSuite extends SparkFunSuite {
class TestActor extends ActorReceiver {
override def receive: Receive = {
case m: String => store(m)
+ case m => store(m, 10.seconds)
}
}