From b7d74a602f622d8e105b349bd6d17ba42e7668dc Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 20 Jan 2016 13:55:41 -0800 Subject: [SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu Closes #10744 from zsxwing/streaming-akka-2. --- .../apache/spark/streaming/StreamingContext.scala | 24 +- .../streaming/api/java/JavaStreamingContext.scala | 64 ------ .../spark/streaming/receiver/ActorReceiver.scala | 245 --------------------- 3 files changed, 1 insertion(+), 332 deletions(-) delete mode 100644 streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index b7070dda99..ec57c05e3b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContextState._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver} +import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils} @@ -295,27 +294,6 @@ class StreamingContext private[streaming] ( } } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2) - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy - ): ReceiverInputDStream[T] = withNamedScope("actor stream") { - receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy)) - } - /** * Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 00f9d8a9e8..7a25ce54b6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} @@ -356,69 +355,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { ssc.fileStream[K, V, F](directory, fn, newFilesOnly, conf) } - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel, - supervisorStrategy: SupervisorStrategy - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel, supervisorStrategy) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * @param storageLevel Storage level to use for storing the received objects - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String, - storageLevel: StorageLevel - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name, storageLevel) - } - - /** - * Create an input stream with any arbitrary user implemented actor receiver. - * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2. - * @param props Props object defining creation of the actor - * @param name Name of the actor - * - * @note An important point to note: - * Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of data received and actorStream - * should be same. - */ - def actorStream[T]( - props: Props, - name: String - ): JavaReceiverInputDStream[T] = { - implicit val cm: ClassTag[T] = - implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - ssc.actorStream[T](props, name) - } - /** * Create an input stream from an queue of RDDs. In each batch, * it will process either one or all of the RDDs returned by the queue. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala deleted file mode 100644 index 0eabf3d260..0000000000 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.receiver - -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicInteger - -import scala.concurrent.duration._ -import scala.language.postfixOps -import scala.reflect.ClassTag - -import akka.actor._ -import akka.actor.SupervisorStrategy.{Escalate, Restart} - -import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.storage.StorageLevel - -/** - * :: DeveloperApi :: - * A helper with set of defaults for supervisor strategy - */ -@DeveloperApi -object ActorSupervisorStrategy { - - val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = - 15 millis) { - case _: RuntimeException => Restart - case _: Exception => Escalate - } -} - -/** - * :: DeveloperApi :: - * A base Actor that provides APIs for pushing received data into Spark Streaming for processing. - * - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * - * @example {{{ - * class MyActor extends ActorReceiver { - * def receive { - * case anything: String => store(anything) - * } - * } - * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") - * - * }}} - * - * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream - * should be same. - */ -@DeveloperApi -abstract class ActorReceiver extends Actor { - - /** Store an iterator of received data as a data block into Spark's memory. */ - def store[T](iter: Iterator[T]) { - context.parent ! IteratorData(iter) - } - - /** - * Store the bytes of received data as a data block into Spark's memory. Note - * that the data in the ByteBuffer must be serialized using the same serializer - * that Spark is configured to use. - */ - def store(bytes: ByteBuffer) { - context.parent ! ByteBufferData(bytes) - } - - /** - * Store a single item of received data to Spark's memory. - * These single items will be aggregated together into data blocks before - * being pushed into Spark's memory. - */ - def store[T](item: T) { - context.parent ! SingleItemData(item) - } -} - -/** - * :: DeveloperApi :: - * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for - * processing. - * - * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * - * @example {{{ - * class MyActor extends JavaActorReceiver { - * def receive { - * case anything: String => store(anything) - * } - * } - * - * // Can be used with an actorStream as follows - * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver") - * - * }}} - * - * @note Since Actor may exist outside the spark framework, It is thus user's responsibility - * to ensure the type safety, i.e parametrized type of push block and InputDStream - * should be same. - */ -@DeveloperApi -abstract class JavaActorReceiver extends UntypedActor { - - /** Store an iterator of received data as a data block into Spark's memory. */ - def store[T](iter: Iterator[T]) { - context.parent ! IteratorData(iter) - } - - /** - * Store the bytes of received data as a data block into Spark's memory. Note - * that the data in the ByteBuffer must be serialized using the same serializer - * that Spark is configured to use. - */ - def store(bytes: ByteBuffer) { - context.parent ! ByteBufferData(bytes) - } - - /** - * Store a single item of received data to Spark's memory. - * These single items will be aggregated together into data blocks before - * being pushed into Spark's memory. - */ - def store[T](item: T) { - context.parent ! SingleItemData(item) - } -} - -/** - * :: DeveloperApi :: - * Statistics for querying the supervisor about state of workers. Used in - * conjunction with `StreamingContext.actorStream` and - * [[org.apache.spark.streaming.receiver.ActorReceiver]]. - */ -@DeveloperApi -case class Statistics(numberOfMsgs: Int, - numberOfWorkers: Int, - numberOfHiccups: Int, - otherInfo: String) - -/** Case class to receive data sent by child actors */ -private[streaming] sealed trait ActorReceiverData -private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData -private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData -private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData - -/** - * Provides Actors as receivers for receiving stream. - * - * As Actors can also be used to receive data from almost any stream source. - * A nice set of abstraction(s) for actors as receivers is already provided for - * a few general cases. It is thus exposed as an API where user may come with - * their own Actor to run as receiver for Spark Streaming input source. - * - * This starts a supervisor actor which starts workers and also provides - * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance]. - * - * Here's a way to start more supervisor/workers as its children. - * - * @example {{{ - * context.parent ! Props(new Supervisor) - * }}} OR {{{ - * context.parent ! Props(new Worker, "Worker") - * }}} - */ -private[streaming] class ActorReceiverSupervisor[T: ClassTag]( - props: Props, - name: String, - storageLevel: StorageLevel, - receiverSupervisorStrategy: SupervisorStrategy - ) extends Receiver[T](storageLevel) with Logging { - - protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor), - "Supervisor" + streamId) - - class Supervisor extends Actor { - - override val supervisorStrategy = receiverSupervisorStrategy - private val worker = context.actorOf(props, name) - logInfo("Started receiver worker at:" + worker.path) - - private val n: AtomicInteger = new AtomicInteger(0) - private val hiccups: AtomicInteger = new AtomicInteger(0) - - override def receive: PartialFunction[Any, Unit] = { - - case IteratorData(iterator) => - logDebug("received iterator") - store(iterator.asInstanceOf[Iterator[T]]) - - case SingleItemData(msg) => - logDebug("received single") - store(msg.asInstanceOf[T]) - n.incrementAndGet - - case ByteBufferData(bytes) => - logDebug("received bytes") - store(bytes) - - case props: Props => - val worker = context.actorOf(props) - logInfo("Started receiver worker at:" + worker.path) - sender ! worker - - case (props: Props, name: String) => - val worker = context.actorOf(props, name) - logInfo("Started receiver worker at:" + worker.path) - sender ! worker - - case _: PossiblyHarmful => hiccups.incrementAndGet() - - case _: Statistics => - val workers = context.children - sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n")) - - } - } - - def onStart(): Unit = { - actorSupervisor - logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path) - } - - def onStop(): Unit = { - actorSupervisor ! PoisonPill - } -} -- cgit v1.2.3