From 493d65ce651dffc79adcdada0eeeed6452b3cc47 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 2 Jan 2013 13:35:26 -0800 Subject: Several code-quality improvements to DataHandler. - Changed to more accurate name: BufferingBlockCreator - Docstring now correctly reflects the abstraction offered by the class - Made internal methods private - Fixed indentation problems --- .../spark/streaming/BufferingBlockCreator.scala | 80 +++++++++++++++++++++ .../main/scala/spark/streaming/DataHandler.scala | 83 ---------------------- .../scala/spark/streaming/FlumeInputDStream.scala | 2 +- .../scala/spark/streaming/SocketInputDStream.scala | 2 +- .../spark/streaming/input/KafkaInputDStream.scala | 5 +- 5 files changed, 84 insertions(+), 88 deletions(-) create mode 100644 streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala delete mode 100644 streaming/src/main/scala/spark/streaming/DataHandler.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala new file mode 100644 index 0000000000..efd2e75d40 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala @@ -0,0 +1,80 @@ +package spark.streaming + +import java.util.concurrent.ArrayBlockingQueue +import scala.collection.mutable.ArrayBuffer +import spark.Logging +import spark.streaming.util.{RecurringTimer, SystemClock} +import spark.storage.StorageLevel + +/** + * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into + * appropriately named blocks at regular intervals. This class starts two threads, + * one to periodically start a new batch and prepare the previous batch of as a block, + * the other to push the blocks into the block manager. + */ +class BufferingBlockCreator[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) + extends Serializable with Logging { + + case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + + val clock = new SystemClock() + val blockInterval = 200L + val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) + val blockStorageLevel = storageLevel + val blocksForPushing = new ArrayBlockingQueue[Block](1000) + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + var currentBuffer = new ArrayBuffer[T] + + def start() { + blockIntervalTimer.start() + blockPushingThread.start() + logInfo("Data handler started") + } + + def stop() { + blockIntervalTimer.stop() + blockPushingThread.interrupt() + logInfo("Data handler stopped") + } + + def += (obj: T) { + currentBuffer += obj + } + + private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { + new Block(blockId, iterator) + } + + private def updateCurrentBuffer(time: Long) { + try { + val newBlockBuffer = currentBuffer + currentBuffer = new ArrayBuffer[T] + if (newBlockBuffer.size > 0) { + val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) + val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + blocksForPushing.add(newBlock) + } + } catch { + case ie: InterruptedException => + logInfo("Block interval timer thread interrupted") + case e: Exception => + receiver.stop() + } + } + + private def keepPushingBlocks() { + logInfo("Block pushing thread started") + try { + while(true) { + val block = blocksForPushing.take() + receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + } + } catch { + case ie: InterruptedException => + logInfo("Block pushing thread interrupted") + case e: Exception => + receiver.stop() + } + } +} \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/DataHandler.scala deleted file mode 100644 index 05f307a8d1..0000000000 --- a/streaming/src/main/scala/spark/streaming/DataHandler.scala +++ /dev/null @@ -1,83 +0,0 @@ -package spark.streaming - -import java.util.concurrent.ArrayBlockingQueue -import scala.collection.mutable.ArrayBuffer -import spark.Logging -import spark.streaming.util.{RecurringTimer, SystemClock} -import spark.storage.StorageLevel - - -/** - * This is a helper object that manages the data received from the socket. It divides - * the object received into small batches of 100s of milliseconds, pushes them as - * blocks into the block manager and reports the block IDs to the network input - * tracker. It starts two threads, one to periodically start a new batch and prepare - * the previous batch of as a block, the other to push the blocks into the block - * manager. - */ - class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel) - extends Serializable with Logging { - - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) - - val clock = new SystemClock() - val blockInterval = 200L - val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) - val blockStorageLevel = storageLevel - val blocksForPushing = new ArrayBlockingQueue[Block](1000) - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - var currentBuffer = new ArrayBuffer[T] - - def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - - def start() { - blockIntervalTimer.start() - blockPushingThread.start() - logInfo("Data handler started") - } - - def stop() { - blockIntervalTimer.stop() - blockPushingThread.interrupt() - logInfo("Data handler stopped") - } - - def += (obj: T) { - currentBuffer += obj - } - - def updateCurrentBuffer(time: Long) { - try { - val newBlockBuffer = currentBuffer - currentBuffer = new ArrayBuffer[T] - if (newBlockBuffer.size > 0) { - val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) - blocksForPushing.add(newBlock) - } - } catch { - case ie: InterruptedException => - logInfo("Block interval timer thread interrupted") - case e: Exception => - receiver.stop() - } - } - - def keepPushingBlocks() { - logInfo("Block pushing thread started") - try { - while(true) { - val block = blocksForPushing.take() - receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel) - } - } catch { - case ie: InterruptedException => - logInfo("Block pushing thread interrupted") - case e: Exception => - receiver.stop() - } - } - } \ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala index 2959ce4540..02d9811669 100644 --- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala @@ -110,7 +110,7 @@ class FlumeReceiver( storageLevel: StorageLevel ) extends NetworkReceiver[SparkFlumeEvent](streamId) { - lazy val dataHandler = new DataHandler(this, storageLevel) + lazy val dataHandler = new BufferingBlockCreator(this, storageLevel) protected override def onStart() { val responder = new SpecificResponder( diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala index a9e37c0ff0..f7a34d2515 100644 --- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala @@ -32,7 +32,7 @@ class SocketReceiver[T: ClassManifest]( storageLevel: StorageLevel ) extends NetworkReceiver[T](streamId) { - lazy protected val dataHandler = new DataHandler(this, storageLevel) + lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) override def getLocationPreference = None diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala index 7c642d4802..66f60519bc 100644 --- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala @@ -102,7 +102,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, val ZK_TIMEOUT = 10000 // Handles pushing data into the BlockManager - lazy protected val dataHandler = new DataHandler(this, storageLevel) + lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel) // Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset lazy val offsets = HashMap[KafkaPartitionKey, Long]() // Connection to Kafka @@ -114,7 +114,6 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, def onStart() { - // Starting the DataHandler that buffers blocks and pushes them into them BlockManager dataHandler.start() // In case we are using multiple Threads to handle Kafka Messages @@ -181,7 +180,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, // NOT USED - Originally intended for fault-tolerance // class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel) - // extends DataHandler[Any](receiver, storageLevel) { + // extends BufferingBlockCreator[Any](receiver, storageLevel) { // override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = { // // Creates a new Block with Kafka-specific Metadata -- cgit v1.2.3