aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-20 13:55:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-01-20 13:55:41 -0800
commitb7d74a602f622d8e105b349bd6d17ba42e7668dc (patch)
tree118deb532942513693e60f851dde638d7fa818cd /streaming
parent944fdadf77523570f6b33544ad0b388031498952 (diff)
downloadspark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.gz
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.tar.bz2
spark-b7d74a602f622d8e105b349bd6d17ba42e7668dc.zip
[SPARK-7799][SPARK-12786][STREAMING] Add "streaming-akka" project
Include the following changes: 1. Add "streaming-akka" project and org.apache.spark.streaming.akka.AkkaUtils for creating an actorStream 2. Remove "StreamingContext.actorStream" and "JavaStreamingContext.actorStream" 3. Update the ActorWordCount example and add the JavaActorWordCount example 4. Make "streaming-zeromq" depend on "streaming-akka" and update the codes accordingly Author: Shixiong Zhu <shixiong@databricks.com> Closes #10744 from zsxwing/streaming-akka-2.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala24
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala64
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala245
3 files changed, 1 insertions, 332 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b7070dda99..ec57c05e3b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -42,7 +41,7 @@ import org.apache.spark.serializer.SerializationDebugger
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receiver.{ActorReceiverSupervisor, ActorSupervisorStrategy, Receiver}
+import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -296,27 +295,6 @@ class StreamingContext private[streaming] (
}
/**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel RDD storage level (default: StorageLevel.MEMORY_AND_DISK_SER_2)
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T: ClassTag](
- props: Props,
- name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ActorSupervisorStrategy.defaultStrategy
- ): ReceiverInputDStream[T] = withNamedScope("actor stream") {
- receiverStream(new ActorReceiverSupervisor[T](props, name, storageLevel, supervisorStrategy))
- }
-
- /**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 00f9d8a9e8..7a25ce54b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -24,7 +24,6 @@ import java.util.{List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
@@ -357,69 +356,6 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
}
/**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- supervisorStrategy: SupervisorStrategy
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
- }
-
- /**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- * @param storageLevel Storage level to use for storing the received objects
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String,
- storageLevel: StorageLevel
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name, storageLevel)
- }
-
- /**
- * Create an input stream with any arbitrary user implemented actor receiver.
- * Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
- * @param props Props object defining creation of the actor
- * @param name Name of the actor
- *
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of data received and actorStream
- * should be same.
- */
- def actorStream[T](
- props: Props,
- name: String
- ): JavaReceiverInputDStream[T] = {
- implicit val cm: ClassTag[T] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- ssc.actorStream[T](props, name)
- }
-
- /**
* Create an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
deleted file mode 100644
index 0eabf3d260..0000000000
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.receiver
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
-import scala.reflect.ClassTag
-
-import akka.actor._
-import akka.actor.SupervisorStrategy.{Escalate, Restart}
-
-import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.storage.StorageLevel
-
-/**
- * :: DeveloperApi ::
- * A helper with set of defaults for supervisor strategy
- */
-@DeveloperApi
-object ActorSupervisorStrategy {
-
- val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
- 15 millis) {
- case _: RuntimeException => Restart
- case _: Exception => Escalate
- }
-}
-
-/**
- * :: DeveloperApi ::
- * A base Actor that provides APIs for pushing received data into Spark Streaming for processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- * class MyActor extends ActorReceiver {
- * def receive {
- * case anything: String => store(anything)
- * }
- * }
- *
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
- * should be same.
- */
-@DeveloperApi
-abstract class ActorReceiver extends Actor {
-
- /** Store an iterator of received data as a data block into Spark's memory. */
- def store[T](iter: Iterator[T]) {
- context.parent ! IteratorData(iter)
- }
-
- /**
- * Store the bytes of received data as a data block into Spark's memory. Note
- * that the data in the ByteBuffer must be serialized using the same serializer
- * that Spark is configured to use.
- */
- def store(bytes: ByteBuffer) {
- context.parent ! ByteBufferData(bytes)
- }
-
- /**
- * Store a single item of received data to Spark's memory.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- */
- def store[T](item: T) {
- context.parent ! SingleItemData(item)
- }
-}
-
-/**
- * :: DeveloperApi ::
- * A Java UntypedActor that provides APIs for pushing received data into Spark Streaming for
- * processing.
- *
- * Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
- *
- * @example {{{
- * class MyActor extends JavaActorReceiver {
- * def receive {
- * case anything: String => store(anything)
- * }
- * }
- *
- * // Can be used with an actorStream as follows
- * ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
- *
- * }}}
- *
- * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
- * to ensure the type safety, i.e parametrized type of push block and InputDStream
- * should be same.
- */
-@DeveloperApi
-abstract class JavaActorReceiver extends UntypedActor {
-
- /** Store an iterator of received data as a data block into Spark's memory. */
- def store[T](iter: Iterator[T]) {
- context.parent ! IteratorData(iter)
- }
-
- /**
- * Store the bytes of received data as a data block into Spark's memory. Note
- * that the data in the ByteBuffer must be serialized using the same serializer
- * that Spark is configured to use.
- */
- def store(bytes: ByteBuffer) {
- context.parent ! ByteBufferData(bytes)
- }
-
- /**
- * Store a single item of received data to Spark's memory.
- * These single items will be aggregated together into data blocks before
- * being pushed into Spark's memory.
- */
- def store[T](item: T) {
- context.parent ! SingleItemData(item)
- }
-}
-
-/**
- * :: DeveloperApi ::
- * Statistics for querying the supervisor about state of workers. Used in
- * conjunction with `StreamingContext.actorStream` and
- * [[org.apache.spark.streaming.receiver.ActorReceiver]].
- */
-@DeveloperApi
-case class Statistics(numberOfMsgs: Int,
- numberOfWorkers: Int,
- numberOfHiccups: Int,
- otherInfo: String)
-
-/** Case class to receive data sent by child actors */
-private[streaming] sealed trait ActorReceiverData
-private[streaming] case class SingleItemData[T](item: T) extends ActorReceiverData
-private[streaming] case class IteratorData[T](iterator: Iterator[T]) extends ActorReceiverData
-private[streaming] case class ByteBufferData(bytes: ByteBuffer) extends ActorReceiverData
-
-/**
- * Provides Actors as receivers for receiving stream.
- *
- * As Actors can also be used to receive data from almost any stream source.
- * A nice set of abstraction(s) for actors as receivers is already provided for
- * a few general cases. It is thus exposed as an API where user may come with
- * their own Actor to run as receiver for Spark Streaming input source.
- *
- * This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
- *
- * Here's a way to start more supervisor/workers as its children.
- *
- * @example {{{
- * context.parent ! Props(new Supervisor)
- * }}} OR {{{
- * context.parent ! Props(new Worker, "Worker")
- * }}}
- */
-private[streaming] class ActorReceiverSupervisor[T: ClassTag](
- props: Props,
- name: String,
- storageLevel: StorageLevel,
- receiverSupervisorStrategy: SupervisorStrategy
- ) extends Receiver[T](storageLevel) with Logging {
-
- protected lazy val actorSupervisor = SparkEnv.get.actorSystem.actorOf(Props(new Supervisor),
- "Supervisor" + streamId)
-
- class Supervisor extends Actor {
-
- override val supervisorStrategy = receiverSupervisorStrategy
- private val worker = context.actorOf(props, name)
- logInfo("Started receiver worker at:" + worker.path)
-
- private val n: AtomicInteger = new AtomicInteger(0)
- private val hiccups: AtomicInteger = new AtomicInteger(0)
-
- override def receive: PartialFunction[Any, Unit] = {
-
- case IteratorData(iterator) =>
- logDebug("received iterator")
- store(iterator.asInstanceOf[Iterator[T]])
-
- case SingleItemData(msg) =>
- logDebug("received single")
- store(msg.asInstanceOf[T])
- n.incrementAndGet
-
- case ByteBufferData(bytes) =>
- logDebug("received bytes")
- store(bytes)
-
- case props: Props =>
- val worker = context.actorOf(props)
- logInfo("Started receiver worker at:" + worker.path)
- sender ! worker
-
- case (props: Props, name: String) =>
- val worker = context.actorOf(props, name)
- logInfo("Started receiver worker at:" + worker.path)
- sender ! worker
-
- case _: PossiblyHarmful => hiccups.incrementAndGet()
-
- case _: Statistics =>
- val workers = context.children
- sender ! Statistics(n.get, workers.size, hiccups.get, workers.mkString("\n"))
-
- }
- }
-
- def onStart(): Unit = {
- actorSupervisor
- logInfo("Supervision tree for receivers initialized at:" + actorSupervisor.path)
- }
-
- def onStop(): Unit = {
- actorSupervisor ! PoisonPill
- }
-}