diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-20 13:55:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2016-01-20 13:55:41 -0800 |
commit | b7d74a602f622d8e105b349bd6d17ba42e7668dc (patch) | |
tree | 118deb532942513693e60f851dde638d7fa818cd /external/akka | |
parent | 944fdadf77523570f6b33544ad0b388031498952 (diff) | |
download | spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2 spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip |
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes:
1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream
2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream"
3. Update the ActorWordCount example and add the JavaActorWordCount example
4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'external/akka')
5 files changed, 619 insertions, 0 deletions
diff --git a/external/akka/pom.xml b/external/akka/pom.xml new file mode 100644 index 0000000000..34de9bae00 --- /dev/null +++ b/external/akka/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.10</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-akka_2.10</artifactId> + <properties> + <sbt.project.name>streaming-akka</sbt.project.name> + </properties> + <packaging>jar</packaging> + <name>Spark Project External Akka</name> + <url>http://spark.apache.org/</url> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-actor_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>${akka.group}</groupId> + <artifactId>akka-remote_${scala.binary.version}</artifactId> + <version>${akka.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + </build> +</project> diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala new file mode 100644 index 0000000000..c75dc92445 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/ActorReceiver.scala @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.duration._ +import scala.language.postfixOps +import scala.reflect.ClassTag + +import akka.actor._ +import akka.actor.SupervisorStrategy.{Escalate, Restart} +import com.typesafe.config.ConfigFactory + +import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +/** + * :: DeveloperApi :: + * A helper with set of defaults for supervisor strategy + */ +@DeveloperApi +object ActorReceiver { + + /** + * A OneForOneStrategy supervisor strategy with `maxNrOfRetries = 10` and + * `withinTimeRange = 15 millis`. For RuntimeException, it will restart the ActorReceiver; for + * others, it just escalates the failure to the supervisor of the supervisor. + */ + val defaultSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = + 15 millis) { + case _: RuntimeException => Restart + case _: Exception => Escalate + } + + /** + * A default ActorSystem creator. It will use a unique system name + * (streaming-actor-system-<spark-task-attempt-id>) to start an ActorSystem that supports remote + * communication. + */ + val defaultActorSystemCreator: () => ActorSystem = () => { + val uniqueSystemName = s"streaming-actor-system-${TaskContext.get().taskAttemptId()}" + val akkaConf = ConfigFactory.parseString( + s"""akka.actor.provider = "akka.remote.RemoteActorRefProvider" + |akka.remote.enabled-transports = ["akka.remote.netty.tcp"] + |""".stripMargin) + ActorSystem(uniqueSystemName, akkaConf) + } +} + +/** + * :: DeveloperApi :: + * A base Actor that provides APIs for pushing received data into Spark Streaming for processing. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends ActorReceiver { + * def receive { + * case anything: String => store(anything) + * } + * } + * + * AkkaUtils.createStream[String](ssc, Props[MyActor](),"MyActorReceiver") + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of push block and InputDStream + * should be same. + */ +@DeveloperApi +abstract class ActorReceiver extends Actor { + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: Iterator[T]) { + context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + context.parent ! SingleItemData(item) + } +} + +/** + * :: DeveloperApi :: + * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for + * processing. + * + * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html + * + * @example {{{ + * class MyActor extends JavaActorReceiver { + * @Override + * public void onReceive(Object msg) throws Exception { + * store((String) msg); + * } + * } + * + * AkkaUtils.<String>createStream(jssc, Props.create(MyActor.class), "MyActorReceiver"); + * + * }}} + * + * @note Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of push block and InputDStream + * should be same. + */ +@DeveloperApi +abstract class JavaActorReceiver extends UntypedActor { + + /** Store an iterator of received data as a data block into Spark's memory. */ + def store[T](iter: Iterator[T]) { + context.parent ! IteratorData(iter) + } + + /** + * Store the bytes of received data as a data block into Spark's memory. Note + * that the data in the ByteBuffer must be serialized using the same serializer + * that Spark is configured to use. + */ + def store(bytes: ByteBuffer) { + context.parent ! ByteBufferData(bytes) + } + + /** + * Store a single item of received data to Spark's memory. + * These single items will be aggregated together into data blocks before + * being pushed into Spark's memory. + */ + def store[T](item: T) { + context.parent ! SingleItemData(item) + } +} + +/** + * :: DeveloperApi :: + * Statistics for querying the supervisor about state of workers. Used in + * conjunction with `AkkaUtils.createStream` and + * [[org.apache.spark.streaming.akka.ActorReceiverSupervisor]]. + */ +@DeveloperApi +case class Statistics(numberOfMsgs: Int, + numberOfWorkers: Int, + numberOfHiccups: Int, + otherInfo: String) + +/** Case class to receive data sent by child actors */ +private[akka] sealed trait ActorReceiverData +private[akka] case class SingleItemData[T](item: T) extends ActorReceiverData +private[akka] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData +private[akka] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData + +/** + * Provides Actors as receivers for receiving stream. + * + * As Actors can also be used to receive data from almost any stream source. + * A nice set of abstraction(s) for actors as receivers is already provided for + * a few general cases. It is thus exposed as an API where user may come with + * their own Actor to run as receiver for Spark Streaming input source. + * + * This starts a supervisor actor which starts workers and also provides + * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. + * + * Here's a way to start more supervisor/workers as its children. + * + * @example {{{ + * context.parent ! Props(new Supervisor) + * }}} OR {{{ + * context.parent ! Props(new Worker, "Worker") + * }}} + */ +private[akka] class ActorReceiverSupervisor[T: ClassTag]( + actorSystemCreator: () => ActorSystem, + props: Props, + name: String, + storageLevel: StorageLevel, + receiverSupervisorStrategy: SupervisorStrategy + ) extends Receiver[T](storageLevel) with Logging { + + private lazy val actorSystem = actorSystemCreator() + protected lazy val actorSupervisor = actorSystem.actorOf(Props(new Supervisor), + "Supervisor" + streamId) + + class Supervisor extends Actor { + + override val supervisorStrategy = receiverSupervisorStrategy + private val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + + private val n: AtomicInteger = new AtomicInteger(0) + private val hiccups: AtomicInteger = new AtomicInteger(0) + + override def receive: PartialFunction[Any, Unit] = { + + case IteratorData(iterator) => + logDebug("received iterator") + store(iterator.asInstanceOf[Iterator[T]]) + + case SingleItemData(msg) => + logDebug("received single") + store(msg.asInstanceOf[T]) + n.incrementAndGet + + case ByteBufferData(bytes) => + logDebug("received bytes") + store(bytes) + + case props: Props => + val worker = context.actorOf(props) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case (props: Props, name: String) => + val worker = context.actorOf(props, name) + logInfo("Started receiver worker at:" + worker.path) + sender ! worker + + case _: PossiblyHarmful => hiccups.incrementAndGet() + + case _: Statistics => + val workers = context.children + sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) + + } + } + + def onStart(): Unit = { + actorSupervisor + logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path) + } + + def onStop(): Unit = { + actorSupervisor ! PoisonPill + actorSystem.shutdown() + actorSystem.awaitTermination() + } +} diff --git a/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala new file mode 100644 index 0000000000..38c35c5ae7 --- /dev/null +++ b/external/akka/src/main/scala/org/apache/spark/streaming/akka/AkkaUtils.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import scala.reflect.ClassTag + +import akka.actor.{ActorSystem, Props, SupervisorStrategy} + +import org.apache.spark.api.java.function.{Function0 => JFunction0} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +object AkkaUtils { + + /** + * Create an input stream with a user-defined actor. See [[ActorReceiver]] for more details. + * + * @param ssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping (default: + * ActorReceiver.defaultActorSystemCreator) + * @param supervisorStrategy the supervisor strategy (default: ActorReceiver.defaultStrategy) + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + actorSystemCreator: () => ActorSystem = ActorReceiver.defaultActorSystemCreator, + supervisorStrategy: SupervisorStrategy = ActorReceiver.defaultSupervisorStrategy + ): ReceiverInputDStream[T] = ssc.withNamedScope("actor stream") { + val cleanF = ssc.sc.clean(actorSystemCreator) + ssc.receiverStream(new ActorReceiverSupervisor[T]( + cleanF, + propsForActor, + actorName, + storageLevel, + supervisorStrategy)) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * @param actorSystemCreator A function to create ActorSystem in executors. `ActorSystem` will + * be shut down when the receiver is stopping. + * @param supervisorStrategy the supervisor strategy + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel, + actorSystemCreator: JFunction0[ActorSystem], + supervisorStrategy: SupervisorStrategy + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T]( + jssc.ssc, + propsForActor, + actorName, + storageLevel, + () => actorSystemCreator.call(), + supervisorStrategy) + } + + /** + * Create an input stream with a user-defined actor. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * @param storageLevel Storage level to use for storing the received objects + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String, + storageLevel: StorageLevel + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName, storageLevel) + } + + /** + * Create an input stream with a user-defined actor. Storage level of the data will be the default + * StorageLevel.MEMORY_AND_DISK_SER_2. See [[JavaActorReceiver]] for more details. + * + * @param jssc The StreamingContext instance + * @param propsForActor Props object defining creation of the actor + * @param actorName Name of the actor + * + * @note An important point to note: + * Since Actor may exist outside the spark framework, It is thus user's responsibility + * to ensure the type safety, i.e. parametrized type of data received and createStream + * should be same. + */ + def createStream[T]( + jssc: JavaStreamingContext, + propsForActor: Props, + actorName: String + ): JavaReceiverInputDStream[T] = { + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] + createStream[T](jssc.ssc, propsForActor, actorName) + } +} diff --git a/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java new file mode 100644 index 0000000000..b732506767 --- /dev/null +++ b/external/akka/src/test/java/org/apache/spark/streaming/akka/JavaAkkaUtilsSuite.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka; + +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.junit.Test; + +import org.apache.spark.api.java.function.Function0; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; + +public class JavaAkkaUtilsSuite { + + @Test // tests the API, does not actually test data receiving + public void testAkkaUtils() { + JavaStreamingContext jsc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + try { + JavaReceiverInputDStream<String> test1 = AkkaUtils.<String>createStream( + jsc, Props.create(JavaTestActor.class), "test"); + JavaReceiverInputDStream<String> test2 = AkkaUtils.<String>createStream( + jsc, Props.create(JavaTestActor.class), "test", StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaReceiverInputDStream<String> test3 = AkkaUtils.<String>createStream( + jsc, + Props.create(JavaTestActor.class), + "test", StorageLevel.MEMORY_AND_DISK_SER_2(), + new ActorSystemCreatorForTest(), + SupervisorStrategy.defaultStrategy()); + } finally { + jsc.stop(); + } + } +} + +class ActorSystemCreatorForTest implements Function0<ActorSystem> { + @Override + public ActorSystem call() { + return null; + } +} + + +class JavaTestActor extends JavaActorReceiver { + @Override + public void onReceive(Object message) throws Exception { + store((String) message); + } +} diff --git a/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala new file mode 100644 index 0000000000..f437585a98 --- /dev/null +++ b/external/akka/src/test/scala/org/apache/spark/streaming/akka/AkkaUtilsSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.akka + +import akka.actor.{Props, SupervisorStrategy} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.dstream.ReceiverInputDStream + +class AkkaUtilsSuite extends SparkFunSuite { + + test("createStream") { + val ssc: StreamingContext = new StreamingContext("local[2]", "test", Seconds(1000)) + try { + // tests the API, does not actually test data receiving + val test1: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test") + val test2: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2) + val test3: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy = SupervisorStrategy.defaultStrategy) + val test4: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test5: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, Props[TestActor](), "test", StorageLevel.MEMORY_AND_DISK_SER_2, () => null) + val test6: ReceiverInputDStream[String] = AkkaUtils.createStream( + ssc, + Props[TestActor](), + "test", + StorageLevel.MEMORY_AND_DISK_SER_2, + () => null, + SupervisorStrategy.defaultStrategy) + } finally { + ssc.stop() + } + } +} + +class TestActor extends ActorReceiver { + override def receive: Receive = { + case m: String => store(m) + } +} |