aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-02 14:17:59 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-02 14:19:51 -0800
commit2ef993d159939e9dedf909991ec5c5789bdd3670 (patch)
treed55c979d2d67fded29a302ee47f388b7eb812e0d /streaming/src
parent96a6ff0b09f276fb38656bb753592b1deeff5dd1 (diff)
downloadspark-2ef993d159939e9dedf909991ec5c5789bdd3670.tar.gz
spark-2ef993d159939e9dedf909991ec5c5789bdd3670.tar.bz2
spark-2ef993d159939e9dedf909991ec5c5789bdd3670.zip
BufferingBlockCreator -> NetworkReceiver.BlockGenerator
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala80
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala8
5 files changed, 89 insertions, 94 deletions
diff --git a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala
deleted file mode 100644
index efd2e75d40..0000000000
--- a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-package spark.streaming
-
-import java.util.concurrent.ArrayBlockingQueue
-import scala.collection.mutable.ArrayBuffer
-import spark.Logging
-import spark.streaming.util.{RecurringTimer, SystemClock}
-import spark.storage.StorageLevel
-
-/**
- * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
- * appropriately named blocks at regular intervals. This class starts two threads,
- * one to periodically start a new batch and prepare the previous batch of as a block,
- * the other to push the blocks into the block manager.
- */
-class BufferingBlockCreator[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel)
- extends Serializable with Logging {
-
- case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
-
- val clock = new SystemClock()
- val blockInterval = 200L
- val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
- val blockStorageLevel = storageLevel
- val blocksForPushing = new ArrayBlockingQueue[Block](1000)
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- var currentBuffer = new ArrayBuffer[T]
-
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Data handler started")
- }
-
- def stop() {
- blockIntervalTimer.stop()
- blockPushingThread.interrupt()
- logInfo("Data handler stopped")
- }
-
- def += (obj: T) {
- currentBuffer += obj
- }
-
- private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
- new Block(blockId, iterator)
- }
-
- private def updateCurrentBuffer(time: Long) {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[T]
- if (newBlockBuffer.size > 0) {
- val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval)
- val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
- blocksForPushing.add(newBlock)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block interval timer thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
-
- private def keepPushingBlocks() {
- logInfo("Block pushing thread started")
- try {
- while(true) {
- val block = blocksForPushing.take()
- receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block pushing thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
-} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index a6fa378d6e..ca70e72e56 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -97,13 +97,13 @@ private[streaming] object SparkFlumeEvent {
private[streaming]
class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
override def append(event : AvroFlumeEvent) : Status = {
- receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event)
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event)
Status.OK
}
override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = {
events.foreach (event =>
- receiver.dataHandler += SparkFlumeEvent.fromAvroFlumeEvent(event))
+ receiver.blockGenerator += SparkFlumeEvent.fromAvroFlumeEvent(event))
Status.OK
}
}
@@ -118,19 +118,19 @@ class FlumeReceiver(
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
- lazy val dataHandler = new BufferingBlockCreator(this, storageLevel)
+ lazy val blockGenerator = new BlockGenerator(storageLevel)
protected override def onStart() {
val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this));
val server = new NettyServer(responder, new InetSocketAddress(host, port));
- dataHandler.start()
+ blockGenerator.start()
server.start()
logInfo("Flume receiver started")
}
protected override def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
logInfo("Flume receiver stopped")
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index b1941fb427..25988a2ce7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -110,19 +110,19 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager
- lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel)
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
var consumerConnector : ZookeeperConsumerConnector = null
def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
}
def onStart() {
- dataHandler.start()
+ blockGenerator.start()
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
@@ -170,8 +170,8 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- stream.takeWhile { msgAndMetadata =>
- dataHandler += msgAndMetadata.message
+ stream.takeWhile { msgAndMetadata =>
+ blockGenerator += msgAndMetadata.message
// Updating the offet. The key is (broker, topic, group, partition).
val key = KafkaPartitionKey(msgAndMetadata.topicInfo.brokerId, msgAndMetadata.topic,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 41276da8bb..18e62a0e33 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -14,6 +14,8 @@ import akka.actor.{Props, Actor}
import akka.pattern.ask
import akka.dispatch.Await
import akka.util.duration._
+import spark.streaming.util.{RecurringTimer, SystemClock}
+import java.util.concurrent.ArrayBlockingQueue
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
@@ -154,4 +156,77 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
tracker ! DeregisterReceiver(streamId, msg)
}
}
+
+ /**
+ * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
+ * appropriately named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+ class BlockGenerator(storageLevel: StorageLevel)
+ extends Serializable with Logging {
+
+ case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+
+ val clock = new SystemClock()
+ val blockInterval = 200L
+ val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
+ val blockStorageLevel = storageLevel
+ val blocksForPushing = new ArrayBlockingQueue[Block](1000)
+ val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ var currentBuffer = new ArrayBuffer[T]
+
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Data handler started")
+ }
+
+ def stop() {
+ blockIntervalTimer.stop()
+ blockPushingThread.interrupt()
+ logInfo("Data handler stopped")
+ }
+
+ def += (obj: T) {
+ currentBuffer += obj
+ }
+
+ private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
+ new Block(blockId, iterator)
+ }
+
+ private def updateCurrentBuffer(time: Long) {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[T]
+ if (newBlockBuffer.size > 0) {
+ val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval)
+ val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+ blocksForPushing.add(newBlock)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block interval timer thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+
+ private def keepPushingBlocks() {
+ logInfo("Block pushing thread started")
+ try {
+ while(true) {
+ val block = blocksForPushing.take()
+ NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread interrupted")
+ case e: Exception =>
+ NetworkReceiver.this.stop()
+ }
+ }
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 8374f131d6..8e4b20ea4c 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -29,7 +29,7 @@ class SocketReceiver[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkReceiver[T](streamId) {
- lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel)
+ lazy protected val blockGenerator = new BlockGenerator(storageLevel)
override def getLocationPreference = None
@@ -37,16 +37,16 @@ class SocketReceiver[T: ClassManifest](
logInfo("Connecting to " + host + ":" + port)
val socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
- dataHandler.start()
+ blockGenerator.start()
val iterator = bytesToObjects(socket.getInputStream())
while(iterator.hasNext) {
val obj = iterator.next
- dataHandler += obj
+ blockGenerator += obj
}
}
protected def onStop() {
- dataHandler.stop()
+ blockGenerator.stop()
}
}