aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd /external
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'external')
-rw-r--r--external/akka/pom.xml73
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala269
-rw-r--r--external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala147
-rw-r--r--external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java66
-rw-r--r--external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala64
-rw-r--r--external/zeromq/pom.xml5
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala2
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala76
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java31
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala16
10 files changed, 712 insertions, 37 deletions
diff --git a/external/akka/pom.xml b/external/akka/pom.xml
new file mode 100644
index 0000000000..34de9bae00
--- /dev/null
+++ b/external/akka/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-parent_2.10</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_2.10</artifactId>
+ <properties>
+ <sbt.project.name>streaming-akka</sbt.project.name>
+ </properties>
+ <packaging>jar</packaging>
+ <name>Spark Project External Akka</name>
+ <url>http://spark.apache.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-actor_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${akka.group}</groupId>
+ <artifactId>akka-remote_${scala.binary.version}</artifactId>
+ <version>${akka.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ </build>
+</project>
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
new file mode 100644
index 0000000000..c75dc92445
--- /dev/null
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.reflect.ClassTag
+
+import akka.actor._
+import akka.actor.SupervisorStrategy.{Escalate, Restart}
+import com.typesafe.config.ConfigFactory
+
+import org.apache.spark.{Logging, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+
+/**
+ * :: DeveloperApi ::
+ * A helper with set of defaults for supervisor strategy
+ */
+@DeveloperApi
+object ActorReceiver {
+
+ /**
+ * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and
+ * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for
+ * others, it just escalates the failure to the supervisor of the supervisor.
+ */
+ val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException => Restart
+ case _: Exception => Escalate
+ }
+
+ /**
+ * A default ActorSystem creator. It will use a unique system name
+ * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote
+ * communication.
+ */
+ val defaultActorSystemCreator: () => ActorSystem = () => {
+ val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}"
+ val akkaConf = ConfigFactory.parseString(
+ s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+ |akka.remote.enabled-transports = ["akka.remote.netty.tcp"]
+ |""".stripMargin)
+ ActorSystem(uniqueSystemName, akkaConf)
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
+ *
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ * class MyActor extends ActorReceiver {
+ * def receive {
+ * case anything: String => store(anything)
+ * }
+ * }
+ *
+ * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver")
+ *
+ * }}}
+ *
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
+ * should be same.
+ */
+@DeveloperApi
+abstract class ActorReceiver extends Actor {
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store[T](iter: Iterator[T]) {
+ context.parent ! IteratorData(iter)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
+ */
+ def store(bytes: ByteBuffer) {
+ context.parent ! ByteBufferData(bytes)
+ }
+
+ /**
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ */
+ def store[T](item: T) {
+ context.parent ! SingleItemData(item)
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
+ * processing.
+ *
+ * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
+ *
+ * @example {{{
+ * class MyActor extends JavaActorReceiver {
+ * @Override
+ * public void onReceive(Object msg) throws Exception {
+ * store((String) msg);
+ * }
+ * }
+ *
+ * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver");
+ *
+ * }}}
+ *
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of push block and InputDStream
+ * should be same.
+ */
+@DeveloperApi
+abstract class JavaActorReceiver extends UntypedActor {
+
+ /** Store an iterator of received data as a data block into Spark's memory. */
+ def store[T](iter: Iterator[T]) {
+ context.parent ! IteratorData(iter)
+ }
+
+ /**
+ * Store the bytes of received data as a data block into Spark's memory. Note
+ * that the data in the ByteBuffer must be serialized using the same serializer
+ * that Spark is configured to use.
+ */
+ def store(bytes: ByteBuffer) {
+ context.parent ! ByteBufferData(bytes)
+ }
+
+ /**
+ * Store a single item of received data to Spark's memory.
+ * These single items will be aggregated together into data blocks before
+ * being pushed into Spark's memory.
+ */
+ def store[T](item: T) {
+ context.parent ! SingleItemData(item)
+ }
+}
+
+/**
+ * :: DeveloperApi ::
+ * Statistics for querying the supervisor about state of workers. Used in
+ * conjunction with `AkkaUtils.createStream` and
+ * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]].
+ */
+@DeveloperApi
+case class Statistics(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors */
+private[akka] sealed trait ActorReceiverData
+private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData
+private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
+private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * their own Actor to run as receiver for Spark Streaming input source.
+ *
+ * This starts a supervisor actor which starts workers and also provides
+ * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
+ *
+ * Here's a way to start more supervisor/workers as its children.
+ *
+ * @example {{{
+ * context.parent ! Props(new Supervisor)
+ * }}} OR {{{
+ * context.parent ! Props(new Worker, "Worker")
+ * }}}
+ */
+private[akka] class ActorReceiverSupervisor[T: ClassTag](
+ actorSystemCreator: () => ActorSystem,
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel,
+ receiverSupervisorStrategy: SupervisorStrategy
+ ) extends Receiver[T](storageLevel) with Logging {
+
+ private lazy val actorSystem = actorSystemCreator()
+ protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ class Supervisor extends Actor {
+
+ override val supervisorStrategy = receiverSupervisorStrategy
+ private val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ private val n: AtomicInteger = new AtomicInteger(0)
+ private val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ override def receive: PartialFunction[Any, Unit] = {
+
+ case IteratorData(iterator) =>
+ logDebug("received iterator")
+ store(iterator.asInstanceOf[Iterator[T]])
+
+ case SingleItemData(msg) =>
+ logDebug("received single")
+ store(msg.asInstanceOf[T])
+ n.incrementAndGet
+
+ case ByteBufferData(bytes) =>
+ logDebug("received bytes")
+ store(bytes)
+
+ case props: Props =>
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) =>
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistics =>
+ val workers = context.children
+ sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ }
+ }
+
+ def onStart(): Unit = {
+ actorSupervisor
+ logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
+ }
+
+ def onStop(): Unit = {
+ actorSupervisor ! PoisonPill
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
new file mode 100644
index 0000000000..38c35c5ae7
--- /dev/null
+++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import scala.reflect.ClassTag
+
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
+
+import org.apache.spark.api.java.function.{Function0 => JFunction0}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+object AkkaUtils {
+
+ /**
+ * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details.
+ *
+ * @param ssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
+ ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") {
+ val cleanF = ssc.sc.clean(actorSystemCreator)
+ ssc.receiverStream(new ActorReceiverSupervisor[T](
+ cleanF,
+ propsForActor,
+ actorName,
+ storageLevel,
+ supervisorStrategy))
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
+ supervisorStrategy: SupervisorStrategy
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](
+ jssc.ssc,
+ propsForActor,
+ actorName,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ * @param storageLevel Storage level to use for storing the received objects
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String,
+ storageLevel: StorageLevel
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName, storageLevel)
+ }
+
+ /**
+ * Create an input stream with a user-defined actor. Storage level of the data will be the default
+ * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details.
+ *
+ * @param jssc The StreamingContext instance
+ * @param propsForActor Props object defining creation of the actor
+ * @param actorName Name of the actor
+ *
+ * @note An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e. parametrized type of data received and createStream
+ * should be same.
+ */
+ def createStream[T](
+ jssc: JavaStreamingContext,
+ propsForActor: Props,
+ actorName: String
+ ): JavaReceiverInputDStream[T] = {
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
+ createStream[T](jssc.ssc, propsForActor, actorName)
+ }
+}
diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
new file mode 100644
index 0000000000..b732506767
--- /dev/null
+++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.junit.Test;
+
+import org.apache.spark.api.java.function.Function0;
+import org.apache.spark.storage.StorageLevel;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
+
+public class JavaAkkaUtilsSuite {
+
+ @Test // tests the API, does not actually test data receiving
+ public void testAkkaUtils() {
+ JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ try {
+ JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test");
+ JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream(
+ jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream(
+ jsc,
+ Props.create(JavaTestActor.class),
+ "test", StorageLevel.MEMORY_AND_DISK_SER_2(),
+ new ActorSystemCreatorForTest(),
+ SupervisorStrategy.defaultStrategy());
+ } finally {
+ jsc.stop();
+ }
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
+
+
+class JavaTestActor extends JavaActorReceiver {
+ @Override
+ public void onReceive(Object message) throws Exception {
+ store((String) message);
+ }
+}
diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
new file mode 100644
index 0000000000..f437585a98
--- /dev/null
+++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.akka
+
+import akka.actor.{Props, SupervisorStrategy}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.dstream.ReceiverInputDStream
+
+class AkkaUtilsSuite extends SparkFunSuite {
+
+ test("createStream") {
+ val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000))
+ try {
+ // tests the API, does not actually test data receiving
+ val test1: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test")
+ val test2: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy = SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test5: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
+ val test6: ReceiverInputDStream[String] = AkkaUtils.createStream(
+ ssc,
+ Props[TestActor](),
+ "test",
+ StorageLevel.MEMORY_AND_DISK_SER_2,
+ () => null,
+ SupervisorStrategy.defaultStrategy)
+ } finally {
+ ssc.stop()
+ }
+ }
+}
+
+class TestActor extends ActorReceiver {
+ override def receive: Receive = {
+ case m: String => store(m)
+ }
+}
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index a725988449..7781aaeed9 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -43,6 +43,11 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-akka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
index 506ba8782d..dd367cd43b 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQReceiver.scala
@@ -23,7 +23,7 @@ import akka.util.ByteString
import akka.zeromq._
import org.apache.spark.Logging
-import org.apache.spark.streaming.receiver.ActorReceiver
+import org.apache.spark.streaming.akka.ActorReceiver
/**
* A receiver to subscribe to ZeroMQ stream.
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index 63cd8a2721..1784d6e862 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -20,29 +20,33 @@ package org.apache.spark.streaming.zeromq
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{Props, SupervisorStrategy}
+import akka.actor.{ActorSystem, Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
-import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.api.java.function.{Function => JFunction, Function0 => JFunction0}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.akka.{ActorReceiver, AkkaUtils}
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
-import org.apache.spark.streaming.receiver.ActorSupervisorStrategy
object ZeroMQUtils {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param ssc StreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
* and each frame has sequence of byte thus it needs the converter
* (which might be deserializer of bytes) to translate from sequence
* of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping (default:
+ * ActorReceiver.defaultActorSystemCreator)
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T: ClassTag](
ssc: StreamingContext,
@@ -50,22 +54,31 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: Seq[ByteString] => Iterator[T],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
+ actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator,
+ supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy
): ReceiverInputDStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ AkkaUtils.createStream(
+ ssc,
+ Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver",
+ storageLevel,
+ actorSystemCreator,
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel Storage level to use for storing the received objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will
+ * be shut down when the receiver is stopping.
+ * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy)
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -73,25 +86,33 @@ object ZeroMQUtils {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel,
+ actorSystemCreator: JFunction0[ActorSystem],
supervisorStrategy: SupervisorStrategy
): JavaReceiverInputDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel,
+ () => actorSystemCreator.call(),
+ supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might be
* deserializer of bytes) to translate from sequence of sequence of bytes,
* where sequence refer to a frame and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
+ * @param storageLevel RDD storage level.
*/
def createStream[T](
jssc: JavaStreamingContext,
@@ -104,14 +125,19 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn,
+ storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param jssc JavaStreamingContext object
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe Topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each
* frame has sequence of byte thus it needs the converter(which might
* be deserializer of bytes) to translate from sequence of sequence of
@@ -128,6 +154,10 @@ object ZeroMQUtils {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val fn =
(x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala
- createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
+ createStream[T](
+ jssc.ssc,
+ publisherUrl,
+ subscribe,
+ fn)
}
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 417b91eecb..9ff4b41f97 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,14 +17,17 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
-import org.junit.Test;
+import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
+import org.junit.Test;
+
import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function0;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@@ -32,19 +35,29 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
public void testZeroMQStream() {
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
- Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
- @Override
- public Iterable<String> call(byte[][] bytes) throws Exception {
- return null;
- }
- };
+ Function<byte[][], Iterable<String>> bytesToObjects = new BytesToObjects();
+ Function0<ActorSystem> actorSystemCreator = new ActorSystemCreatorForTest();
JavaReceiverInputDStream<String> test1 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects);
JavaReceiverInputDStream<String> test2 = ZeroMQUtils.<String>createStream(
ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
JavaReceiverInputDStream<String> test3 = ZeroMQUtils.<String>createStream(
- ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), actorSystemCreator,
SupervisorStrategy.defaultStrategy());
}
}
+
+class BytesToObjects implements Function<byte[][], Iterable<String>> {
+ @Override
+ public Iterable<String> call(byte[][] bytes) throws Exception {
+ return null;
+ }
+}
+
+class ActorSystemCreatorForTest implements Function0<ActorSystem> {
+ @Override
+ public ActorSystem call() {
+ return null;
+ }
+}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 35d2e62c68..bac2679cab 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -42,14 +42,22 @@ class ZeroMQStreamSuite extends SparkFunSuite {
// tests the API, does not actually test data receiving
val test1: ReceiverInputDStream[String] =
- ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, actorSystemCreator = () => null)
val test2: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
- ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, () => null)
val test3: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
ssc, publishUrl, subscribe, bytesToObjects,
- StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
+ StorageLevel.MEMORY_AND_DISK_SER_2, () => null, SupervisorStrategy.defaultStrategy)
+ val test4: ReceiverInputDStream[String] =
+ ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test5: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test6: ReceiverInputDStream[String] = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects,
+ StorageLevel.MEMORY_AND_DISK_SER_2, supervisorStrategy = SupervisorStrategy.defaultStrategy)
- // TODO: Actually test data receiving
+ // TODO: Actually test data receiving. A real test needs the native ZeroMQ library
ssc.stop()
}
}