aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala14
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala29
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala8
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala12
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala111
10 files changed, 174 insertions, 21 deletions
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index ca5f11fdba..64972fd5cd 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -24,7 +24,7 @@ private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) ext
*/
private[streaming]
class NetworkInputTracker(
- @transient ssc: StreamingContext,
+ @transient ssc: StreamingContext,
@transient networkInputStreams: Array[NetworkInputDStream[_]])
extends Logging {
@@ -66,12 +66,12 @@ class NetworkInputTracker(
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
if (!networkInputStreamMap.contains(streamId)) {
- throw new Exception("Register received for unexpected id " + streamId)
+ throw new Exception("Register received for unexpected id " + streamId)
}
receiverInfo += ((streamId, receiverActor))
logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address)
sender ! true
- }
+ }
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
@@ -96,8 +96,8 @@ class NetworkInputTracker(
/** This thread class runs all the receivers on the cluster. */
class ReceiverExecutor extends Thread {
val env = ssc.env
-
- override def run() {
+
+ override def run() {
try {
SparkEnv.set(env)
startReceivers()
@@ -114,7 +114,7 @@ class NetworkInputTracker(
*/
def startReceivers() {
val receivers = networkInputStreams.map(nis => {
- val rcvr = nis.createReceiver()
+ val rcvr = nis.getReceiver()
rcvr.setStreamId(nis.id)
rcvr
})
@@ -146,7 +146,7 @@ class NetworkInputTracker(
// Distribute the receivers and start them
ssc.sparkContext.runJob(tempRDD, startReceiver)
}
-
+
/** Stops the receivers. */
def stopReceivers() {
// Signal the receivers to stop
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index d1407b7869..48d344f055 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -1,10 +1,15 @@
package spark.streaming
+import akka.actor.Props
+
import spark.streaming.dstream._
import spark.{RDD, Logging, SparkEnv, SparkContext}
import spark.storage.StorageLevel
import spark.util.MetadataCleaner
+import spark.streaming.receivers.ActorReceiver
+import spark.streaming.receivers.Settings
+
import scala.collection.mutable.Queue
@@ -138,6 +143,30 @@ class StreamingContext private (
protected[streaming] def getNewNetworkStreamId() = nextNetworkInputStreamId.getAndIncrement()
/**
+ * Create an input stream with any arbitrary user implemented network receiver.
+ * @param receiver Custom implementation of NetworkReceiver
+ */
+ def pluggableNetworkStream[T: ClassManifest](
+ receiver: NetworkReceiver[T]): DStream[T] = {
+ val inputStream = new PluggableInputDStream[T](this,
+ receiver)
+ graph.addInputStream(inputStream)
+ inputStream
+ }
+
+ /**
+ * Create an input stream with any arbitrary user implemented akka actor receiver.
+ * @param props Props object defining creation of the actor
+ * @param name Name of the actor
+ * @param storageLevel RDD storage level. Defaults to memory-only.
+ */
+ def pluggableActorStream[T: ClassManifest](
+ props: Props, name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2): DStream[T] = {
+ pluggableNetworkStream(new ActorReceiver(Settings(props, name, storageLevel)))
+ }
+
+ /**
* Create an input stream that pulls messages form a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index efc7058480..c9644b3a83 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -25,7 +25,7 @@ class FlumeInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
- override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
+ override def getReceiver(): NetworkReceiver[SparkFlumeEvent] = {
new FlumeReceiver(host, port, storageLevel)
}
}
@@ -134,4 +134,4 @@ class FlumeReceiver(
}
override def getLocationPreference = Some(host)
-} \ No newline at end of file
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 4f8c8b9d10..dc7139cc27 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -41,7 +41,8 @@ class KafkaInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+
+ def getReceiver(): NetworkReceiver[T] = {
new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
@@ -73,7 +74,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
logInfo("Starting Kafka Consumer Stream with group: " + groupId)
logInfo("Initial offsets: " + initialOffsets.toString)
-
+
// Zookeper connection properties
val props = new Properties()
props.put("zk.connect", zkQuorum)
@@ -104,7 +105,7 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
offsets.foreach { case(key, offset) =>
val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
+ updatePersistentPath(consumerConnector.zkClient,
topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
}
}
@@ -115,10 +116,10 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
logInfo("Starting MessageHandler.")
stream.takeWhile { msgAndMetadata =>
blockGenerator += msgAndMetadata.message
-
// Keep on handling messages
+
true
- }
+ }
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index ecc75ec913..7385474963 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -20,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class for defining any InputDStream that has to start a receiver on worker
* nodes to receive external data. Specific implementations of NetworkInputDStream must
- * define the createReceiver() function that creates the receiver object of type
+ * define the getReceiver() function that gets the receiver object of type
* [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
* data.
* @param ssc_ Streaming context that will execute this input stream
@@ -34,11 +34,11 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
- * Creates the receiver object that will be sent to the worker nodes
+ * Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
- def createReceiver(): NetworkReceiver[T]
+ def getReceiver(): NetworkReceiver[T]
// Nothing to start or stop as both taken care of by the NetworkInputTracker.
def start() {}
@@ -48,7 +48,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
- // master failure forces
+ // master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.networkInputTracker.getBlockIds(id, validTime)
Some(new BlockRDD[T](ssc.sc, blockIds))
diff --git a/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
new file mode 100644
index 0000000000..674f1059fe
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala
@@ -0,0 +1,12 @@
+package spark.streaming.dstream
+
+import spark.streaming.StreamingContext
+
+class PluggableInputDStream[T: ClassManifest](
+ @transient ssc_ : StreamingContext,
+ receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {
+
+ def getReceiver(): NetworkReceiver[T] = {
+ receiver
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index 04e6b69b7b..1b2fa56779 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -25,7 +25,7 @@ class RawInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index d42027092b..4af839ad7f 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -15,7 +15,7 @@ class SocketInputDStream[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_) {
- def createReceiver(): NetworkReceiver[T] = {
+ def getReceiver(): NetworkReceiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
index 0e21b7480c..c697498862 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/TwitterInputDStream.scala
@@ -22,7 +22,7 @@ class TwitterInputDStream(
storageLevel: StorageLevel
) extends NetworkInputDStream[Status](ssc_) {
- override def createReceiver(): NetworkReceiver[Status] = {
+ override def getReceiver(): NetworkReceiver[Status] = {
new TwitterReceiver(username, password, filters, storageLevel)
}
}
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
new file mode 100644
index 0000000000..f24c99ad70
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -0,0 +1,111 @@
+package spark.streaming.receivers
+
+import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
+import akka.actor.{ actorRef2Scala, ActorRef }
+import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
+
+import spark.storage.StorageLevel
+import spark.streaming.dstream.NetworkReceiver
+
+import java.util.concurrent.atomic.AtomicInteger
+
+/** A helper with set of defaults for supervisor strategy **/
+object ReceiverSupervisorStrategy {
+
+ import akka.util.duration._
+ import akka.actor.SupervisorStrategy._
+
+ val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
+ 15 millis) {
+ case _: RuntimeException ⇒ Restart
+ case _: Exception ⇒ Escalate
+ }
+}
+
+/**
+ * Settings for configuring the actor creation or defining supervisor strategy
+ */
+case class Settings(props: Props,
+ name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy)
+
+/**
+ * Statistcs for querying the supervisor about state of workers
+ */
+case class Statistcs(numberOfMsgs: Int,
+ numberOfWorkers: Int,
+ numberOfHiccups: Int,
+ otherInfo: String)
+
+/** Case class to receive data sent by child actors **/
+case class Data[T: ClassManifest](data: T)
+
+/**
+ * Provides Actors as receivers for receiving stream.
+ *
+ * As Actors can also be used to receive data from almost any stream source.
+ * A nice set of abstraction(s) for actors as receivers is already provided for
+ * a few general cases. It is thus exposed as an API where user may come with
+ * his own Actor to run as receiver for Spark Streaming input source.
+ */
+class ActorReceiver[T: ClassManifest](settings: Settings)
+ extends NetworkReceiver[T] {
+
+ protected lazy val blocksGenerator: BlockGenerator =
+ new BlockGenerator(settings.storageLevel)
+
+ protected lazy val supervisor = env.actorSystem.actorOf(Props(new Supervisor),
+ "Supervisor" + streamId)
+
+ private class Supervisor extends Actor {
+
+ override val supervisorStrategy = settings.supervisorStrategy
+ val worker = context.actorOf(settings.props, settings.name)
+ logInfo("Started receiver worker at:" + worker.path)
+
+ val n: AtomicInteger = new AtomicInteger(0)
+ val hiccups: AtomicInteger = new AtomicInteger(0)
+
+ def receive = {
+
+ case props: Props =>
+ val worker = context.actorOf(props)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case (props: Props, name: String) =>
+ val worker = context.actorOf(props, name)
+ logInfo("Started receiver worker at:" + worker.path)
+ sender ! worker
+
+ case _: PossiblyHarmful => hiccups.incrementAndGet()
+
+ case _: Statistcs =>
+ val workers = context.children
+ sender ! Statistcs(n.get, workers.size, hiccups.get, workers.mkString("\n"))
+
+ case Data(iter: Iterator[_]) => push(iter.asInstanceOf[Iterator[T]])
+
+ case Data(msg) =>
+ blocksGenerator += msg.asInstanceOf[T]
+ n.incrementAndGet
+ }
+ }
+
+ protected def push(iter: Iterator[T]) {
+ pushBlock("block-" + streamId + "-" + System.nanoTime(),
+ iter, null, settings.storageLevel)
+ }
+
+ protected def onStart() = {
+ blocksGenerator.start()
+ supervisor
+ logInfo("Supervision tree for receivers initialized at:" + supervisor.path)
+ }
+
+ protected def onStop() = {
+ supervisor ! PoisonPill
+ }
+
+}