aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
-rw-r--r--dev/sparktestsupport/modules.py12
-rw-r--r--docs/streaming-custom-receivers.md49
-rw-r--r--docs/streaming-programming-guide.md4
-rw-r--r--examples/pom.xml5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java14
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala37
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala13
-rw-r--r--external/akka/pom.xml73
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala (renamed from streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala)64
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala147
-rw-r--r--external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java66
-rw-r--r--external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala64
-rw-r--r--external/zeromq/pom.xml5
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala76
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java31
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala16
-rw-r--r--pom.xml1
-rw-r--r--project/MimaExcludes.scala10
-rw-r--r--project/SparkBuild.scala9
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala64
22 files changed, 601 insertions, 185 deletions
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 93a8c15e3e..efe58ea2e0 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -222,6 +222,18 @@ streaming_flume_sink = Module(
)
+streaming_akka = Module(
+ name="streaming-akka",
+ dependencies=[streaming],
+ source_file_regexes=[
+ "external/akka",
+ ],
+ sbt_test_goals=[
+ "streaming-akka/test",
+ ]
+)
+
+
streaming_flume = Module(
name="streaming-flume",
dependencies=[streaming],
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 97db865daa..95b99862ec 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -257,25 +257,54 @@ The following table summarizes the characteristics of both types of receivers
## Implementing and Using a Custom Actor-based Receiver
+<div class="codetabs">
+<div data-lang="scala" markdown="1" >
+
Custom [Akka Actors](http://doc.akka.io/docs/akka/2.3.11/scala/actors.html) can also be used to
-receive data. The [`ActorHelper`](api/scala/index.html#org.apache.spark.streaming.receiver.ActorHelper)
-trait can be mixed in to any Akka actor, which allows received data to be stored in Spark using
- `store(...)` methods. The supervisor strategy of this actor can be configured to handle failures, etc.
+receive data. Extending [`ActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.ActorReceiver)
+allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
{% highlight scala %}
-class CustomActor extends Actor with ActorHelper {
+
+class CustomActor extends ActorReceiver {
def receive = {
case data: String => store(data)
}
}
+
+// A new input stream can be created with this custom actor as
+val ssc: StreamingContext = ...
+val lines = AkkaUtils.createStream[String](ssc, Props[CustomActor](), "CustomReceiver")
+
{% endhighlight %}
-And a new input stream can be created with this custom actor as
+See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala) for an end-to-end example.
+</div>
+<div data-lang="java" markdown="1">
+
+Custom [Akka UntypedActors](http://doc.akka.io/docs/akka/2.3.11/java/untyped-actors.html) can also be used to
+receive data. Extending [`JavaActorReceiver`](api/scala/index.html#org.apache.spark.streaming.akka.JavaActorReceiver)
+allows received data to be stored in Spark using `store(...)` methods. The supervisor strategy of
+this actor can be configured to handle failures, etc.
+
+{% highlight java %}
+
+class CustomActor extends JavaActorReceiver {
+ @Override
+ public void onReceive(Object msg) throws Exception {
+ store((String) msg);
+ }
+}
+
+// A new input stream can be created with this custom actor as
+JavaStreamingContext jssc = ...;
+JavaDStream<String> lines = AkkaUtils.<String>createStream(jssc, Props.create(CustomActor.class), "CustomReceiver");
-{% highlight scala %}
-val ssc: StreamingContext = ...
-val lines = ssc.actorStream[String](Props[CustomActor], "CustomReceiver")
{% endhighlight %}
-See [ActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala)
-for an end-to-end example.
+See [JavaActorWordCount.scala](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/JavaActorWordCount.scala) for an end-to-end example.
+</div>
+</div>
+
+<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala libraries, AkkaUtils is not available in the Python API.
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 8fd075d02b..93c34efb66 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -659,11 +659,11 @@ methods for creating DStreams from files and Akka actors as input sources.
<span class="badge" style="background-color: grey">Python API</span> `fileStream` is not available in the Python API, only `textFileStream` is available.
- **Streams based on Custom Actors:** DStreams can be created with data streams received through Akka
- actors by using `streamingContext.actorStream(actorProps, actor-name)`. See the [Custom Receiver
+ actors by using `AkkaUtils.createStream(ssc, actorProps, actor-name)`. See the [Custom Receiver
Guide](streaming-custom-receivers.html) for more details.
<span class="badge" style="background-color: grey">Python API</span> Since actors are available only in the Java and Scala
- libraries, `actorStream` is not available in the Python API.
+ libraries, `AkkaUtils.createStream` is not available in the Python API.
- **Queue of RDDs as a Stream:** For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
diff --git a/examples/pom.xml b/examples/pom.xml
index 1a0d5e5854..9437cee2ab 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -77,6 +77,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
index 2377207779..62e563380a 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaActorWordCount.java
@@ -31,7 +31,8 @@ import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.receiver.JavaActorReceiver;
+import org.apache.spark.streaming.akka.AkkaUtils;
+import org.apache.spark.streaming.akka.JavaActorReceiver;
/**
* A sample actor as receiver, is also simplest. This receiver actor
@@ -56,6 +57,7 @@ class JavaSampleActorReceiver<T> extends JavaActorReceiver {
remotePublisher.tell(new SubscribeReceiver(getSelf()), getSelf());
}
+ @Override
public void onReceive(Object msg) throws Exception {
store((T) msg);
}
@@ -100,18 +102,20 @@ public class JavaActorWordCount {
String feederActorURI = "akka.tcp://test@" + host + ":" + port + "/user/FeederActor";
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e type of data received and InputDstream
* should be same.
*
- * For example: Both actorStream and JavaSampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and JavaSampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
- JavaDStream<String> lines = jssc.actorStream(
- Props.create(JavaSampleActorReceiver.class, feederActorURI), "SampleReceiver");
+ JavaDStream<String> lines = AkkaUtils.createStream(
+ jssc,
+ Props.create(JavaSampleActorReceiver.class, feederActorURI),
+ "SampleReceiver");
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
index 88cdc6bc14..8e88987439 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ActorWordCount.scala
@@ -22,12 +22,12 @@ import scala.collection.mutable.LinkedList
import scala.reflect.ClassTag
import scala.util.Random
-import akka.actor.{actorRef2Scala, Actor, ActorRef, Props}
+import akka.actor._
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.apache.spark.streaming.receiver.ActorReceiver
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
case class SubscribeReceiver(receiverActor: ActorRef)
case class UnsubscribeReceiver(receiverActor: ActorRef)
@@ -78,8 +78,7 @@ class FeederActor extends Actor {
*
* @see [[org.apache.spark.examples.streaming.FeederActor]]
*/
-class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
-extends ActorReceiver {
+class SampleActorReceiver[T](urlOfPublisher: String) extends ActorReceiver {
lazy private val remotePublisher = context.actorSelection(urlOfPublisher)
@@ -108,9 +107,13 @@ object FeederActor {
}
val Seq(host, port) = args.toSeq
- val conf = new SparkConf
- val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt, conf = conf,
- securityManager = new SecurityManager(conf))._1
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |akka.remote.netty.tcp.hostname = "$host"
+ |akka.remote.netty.tcp.port = $port
+ |""".stripMargin)
+ val actorSystem = ActorSystem("test", akkaConf)
val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
println("Feeder started as:" + feeder)
@@ -121,6 +124,7 @@ object FeederActor {
/**
* A sample word count program demonstrating the use of plugging in
+ *
* Actor as Receiver
* Usage: ActorWordCount <hostname> <port>
* <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
@@ -146,20 +150,21 @@ object ActorWordCount {
val ssc = new StreamingContext(sparkConf, Seconds(2))
/*
- * Following is the use of actorStream to plug in custom actor as receiver
+ * Following is the use of AkkaUtils.createStream to plug in custom actor as receiver
*
* An important point to note:
* Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e type of data received and InputDstream
+ * to ensure the type safety, i.e type of data received and InputDStream
* should be same.
*
- * For example: Both actorStream and SampleActorReceiver are parameterized
+ * For example: Both AkkaUtils.createStream and SampleActorReceiver are parameterized
* to same type to ensure type safety.
*/
-
- val lines = ssc.actorStream[String](
- Props(new SampleActorReceiver[String]("akka.tcp://test@%s:%s/user/FeederActor".format(
- host, port.toInt))), "SampleReceiver")
+ val lines = AkkaUtils.createStream[String](
+ ssc,
+ Props(classOf[SampleActorReceiver[String]],
+ "akka.tcp://test@%s:%s/user/FeederActor".format(host, port.toInt)),
+ "SampleReceiver")
// compute wordcount
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
index 9644890576..f612e508eb 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/ZeroMQWordCount.scala
@@ -25,8 +25,9 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import akka.zeromq._
import akka.zeromq.Subscribe
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.zeromq._
@@ -69,10 +70,10 @@ object SimpleZeroMQPublisher {
*
* To run this example locally, you may run publisher as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar`
+ * org.apache.spark.examples.streaming.SimpleZeroMQPublisher tcp://127.0.0.1:1234 foo`
* and run the example as
* `$ bin/run-example \
- * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.1.1:1234 foo`
+ * org.apache.spark.examples.streaming.ZeroMQWordCount tcp://127.0.0.1:1234 foo`
*/
// scalastyle:on
object ZeroMQWordCount {
@@ -90,7 +91,11 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]): Iterator[String] = x.map(_.utf8String).iterator
// For this stream, a zeroMQ publisher should be running.
- val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
+ val lines = ZeroMQUtils.createStream(
+ ssc,
+ url,
+ Subscribe(topic),
+ bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/external/akka/pom.xml b/external/akka/pom.xml
new file mode 100644
index 0000000000..34de9bae00
--- /dev/null
+++ b/external/akka/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_2.10</artifactId>
+ <properties>
+ <sbt.project.name>streaming-akka</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Akka</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
index 0eabf3d260..c75dc92445 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.receiver
+package org.apache.spark.streaming.akka
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
@@ -26,23 +26,44 @@ import scala.reflect.ClassTag
import akka.actor._
import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import com.typesafe.config.ConfigFactory
-import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
/**
* :: DeveloperApi ::
* A helper with set of defaults for supervisor strategy
*/
@DeveloperApi
-object ActorSupervisorStrategy {
+object ActorReceiver {
- val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ /**
+ * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
+ * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
+ * others, it just escalates the failure to the supervisor of the supervisor.
+ */
+ val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
15 millis) {
case _: RuntimeException => Restart
case _: Exception => Escalate
}
+
+ /**
+ * A default ActorSystem creator. It will use a unique system name
+ * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
+ * communication.
+ */
+ val defaultActorSystemCreator: () => ActorSystem = () => {
+ val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |""".stripMargin)
+ ActorSystem(uniqueSystemName, akkaConf)
+ }
}
/**
@@ -58,13 +79,12 @@ object ActorSupervisorStrategy {
* }
* }
*
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@@ -103,18 +123,18 @@ abstract class ActorReceiver extends Actor {
*
* @example {{{
* class MyActor extends JavaActorReceiver {
- * def receive {
- * case anything: String => store(anything)
+ * @Override
+ * public void onReceive(Object msg) throws Exception {
+ * store((String) msg);
* }
* }
*
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
+ * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
*
* }}}
*
* @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
* should be same.
*/
@DeveloperApi
@@ -147,8 +167,8 @@ abstract class JavaActorReceiver extends UntypedActor {
/**
* :: DeveloperApi ::
* Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorReceiver]].
+ * conjunction with `AkkaUtils.createStream` and
+ * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
*/
@DeveloperApi
case class Statistics(numberOfMsgs: Int,
@@ -157,10 +177,10 @@ case class Statistics(numberOfMsgs: Int,
otherInfo: String)
/** Case class to receive data sent by child actors */
-private[streaming] sealed trait ActorReceiverData
-private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
/**
* Provides Actors as receivers for receiving stream.
@@ -181,14 +201,16 @@ private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorRec
* context.parent ! Props(new Worker, "Worker")
* }}}
*/
-private[streaming] class ActorReceiverSupervisor[T: ClassTag](
+private[akka] class ActorReceiverSupervisor[T: ClassTag](
+ actorSystemCreator: () => ActorSystem,
props: Props,
name: String,
storageLevel: StorageLevel,
receiverSupervisorStrategy: SupervisorStrategy
) extends Receiver[T](storageLevel) with Logging {
- protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
+ private lazy val actorSystem = actorSystemCreator()
+ protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
"Supervisor" + streamId)
class Supervisor extends Actor {
@@ -241,5 +263,7 @@ private[streaming] class ActorReceiverSupervisor[T: ClassTag](
def onStop(): Unit = {
actorSupervisor ! PoisonPill
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
}
}
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
new file mode 100644
index 0000000000..38c35c5ae7
--- /dev/null
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import scala.reflect.ClassTag
+
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
+
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object AkkaUtils {
+
+ /**
+ * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
+ *
+ * @param ssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
+ ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
+ val cleanF = ssc.sc.clean(actorSystemCreator)
+ ssc.receiverStream(new ActorReceiverSupervisor[T](
+ cleanF,
+ propsForActor,
+ actorName,
+ storageLevel,
+ supervisorStrategy))
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
+ supervisorStrategy: SupervisorStrategy
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](
+ jssc.ssc,
+ propsForActor,
+ actorName,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. Storage level of the data will be the default
+ * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName)
+ }
+}
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
new file mode 100644
index 0000000000..b732506767
--- /dev/null
+++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function0;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+
+public class JavaAkkaUtilsSuite {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testAkkaUtils() {
+ JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ try {
+ JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test");
+ JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+ jsc,
+ Props.create(JavaTestActor.class),
+ "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
+ new ActorSystemCreatorForTest(),
+ SupervisorStrategy.defaultStrategy());
+ } finally {
+ jsc.stop();
+ }
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
+
+
+class JavaTestActor extends JavaActorReceiver {
+ @Override
+ public void onReceive(Object message) throws Exception {
+ store((String) message);
+ }
+}
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
new file mode 100644
index 0000000000..f437585a98
--- /dev/null
+++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import akka.actor.{Props, SupervisorStrategy}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class AkkaUtilsSuite extends SparkFunSuite {
+
+ test("createStream") {
+ val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
+ try {
+ // tests the API, does not actually test data receiving
+ val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test")
+ val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy = SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ () => null,
+ SupervisorStrategy.defaultStrategy)
+ } finally {
+ ssc.stop()
+ }
+ }
+}
+
+class TestActor extends ActorReceiver {
+ override def receive: Receive = {
+ case m: String => store(m)
+ }
+}
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index a725988449..7781aaeed9 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -43,6 +43,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 506ba8782d..dd367cd43b 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorReceiver
+import org.apache.spark.streaming.akka.ActorReceiver
/**
* A receiver to subscribe to ZeroMQ stream.
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 63cd8a2721..1784d6e862 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{Props, SupervisorStrategy}
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
@@ -50,22 +54,31 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ AkkaUtils.createStream(
+ ssc,
+ Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver",
+ storageLevel,
+ actorSystemCreator,
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -73,25 +86,33 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
+ * @param storageLevel RDD storage level.
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -104,14 +125,19 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might
* be deserializer of bytes) to translate from sequence of sequence of
@@ -128,6 +154,10 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn)
}
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 417b91eecb..9ff4b41f97 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,14 +17,17 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
+import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
+import org.junit.Test;
+
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
public void testZeroMQStream() {
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
- };
+ Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
+ Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
SupervisorStrategy.defaultStrategy());
}
}
+
+class BytesToObjects implements Function<byte[][], Iterable<String>> {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62c68..bac2679cab 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+ StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
- // TODO: Actually test data receiving
+ // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
ssc.stop()
}
}
diff --git a/pom.xml b/pom.xml
index fca6269913..43f08efaae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,6 +104,7 @@
<module>external/flume</module>
<module>external/flume-sink</module>
<module>external/flume-assembly</module>
+ <module>external/akka</module>
<module>external/mqtt</module>
<module>external/mqtt-assembly</module>
<module>external/zeromq</module>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 6469201446..905fb4cd90 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -154,6 +154,16 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log_"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.sink.TransactionProcessor.org$apache$spark$streaming$flume$sink$Logging$$log__=")
) ++ Seq(
+ // SPARK-7799 Add "streaming-akka" project
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
+ ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$6"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.zeromq.ZeroMQUtils.createStream$default$5"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream")
+ ) ++ Seq(
// SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus")
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 06e561ae0d..3927b88fb0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -35,11 +35,11 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
val allProjects@Seq(catalyst, core, graphx, hive, hiveThriftServer, mllib, repl,
- sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingKafka,
+ sql, networkCommon, networkShuffle, streaming, streamingFlumeSink, streamingFlume, streamingAkka, streamingKafka,
streamingMqtt, streamingTwitter, streamingZeromq, launcher, unsafe, testTags) =
Seq("catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"sql", "network-common", "network-shuffle", "streaming", "streaming-flume-sink",
- "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
+ "streaming-flume", "streaming-akka", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq", "launcher", "unsafe", "test-tags").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, java8Tests, sparkGangliaLgpl,
@@ -232,8 +232,9 @@ object SparkBuild extends PomBuild {
/* Enable tests settings for all projects except examples, assembly and tools */
(allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
+ // TODO: remove streamingAkka from this list after 2.0.0
allProjects.filterNot(x => Seq(spark, hive, hiveThriftServer, catalyst, repl,
- networkCommon, networkShuffle, networkYarn, unsafe, testTags).contains(x)).foreach {
+ networkCommon, networkShuffle, networkYarn, unsafe, streamingAkka, testTags).contains(x)).foreach {
x => enable(MimaBuild.mimaSettings(sparkHome, x))(x)
}
@@ -649,7 +650,7 @@ object Unidoc {
"-public",
"-group", "Core Java API", packageList("api.java", "api.java.function"),
"-group", "Spark Streaming", packageList(
- "streaming.api.java", "streaming.flume", "streaming.kafka",
+ "streaming.api.java", "streaming.flume", "streaming.akka", "streaming.kafka",
"streaming.mqtt", "streaming.twitter", "streaming.zeromq", "streaming.kinesis"
),
"-group", "MLlib", packageList(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b7070dda99..ec57c05e3b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -296,27 +295,6 @@ class StreamingContext private[streaming] (
}
/**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T: ClassTag](
- props: Props,
- name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
- ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
- receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy))
- }
-
- /**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 00f9d8a9e8..7a25ce54b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -357,69 +356,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name, storageLevel)
- }
-
- /**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name)
- }
-
- /**
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*