aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-02 13:35:26 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-02 13:39:18 -0800
commit493d65ce651dffc79adcdada0eeeed6452b3cc47 (patch)
tree70849fb842dbe4443bbc6c5a7a2e85ce48f60ee4 /streaming/src
parent0bc0a60d3001dd231e13057a838d4b6550e5a2b9 (diff)
downloadspark-493d65ce651dffc79adcdada0eeeed6452b3cc47.tar.gz
spark-493d65ce651dffc79adcdada0eeeed6452b3cc47.tar.bz2
spark-493d65ce651dffc79adcdada0eeeed6452b3cc47.zip
Several code-quality improvements to DataHandler.
- Changed to more accurate name: BufferingBlockCreator - Docstring now correctly reflects the abstraction offered by the class - Made internal methods private - Fixed indentation problems
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala80
-rw-r--r--streaming/src/main/scala/spark/streaming/DataHandler.scala83
-rw-r--r--streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/SocketInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala5
5 files changed, 84 insertions, 88 deletions
diff --git a/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala
new file mode 100644
index 0000000000..efd2e75d40
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/BufferingBlockCreator.scala
@@ -0,0 +1,80 @@
+package spark.streaming
+
+import java.util.concurrent.ArrayBlockingQueue
+import scala.collection.mutable.ArrayBuffer
+import spark.Logging
+import spark.streaming.util.{RecurringTimer, SystemClock}
+import spark.storage.StorageLevel
+
+/**
+ * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
+ * appropriately named blocks at regular intervals. This class starts two threads,
+ * one to periodically start a new batch and prepare the previous batch of as a block,
+ * the other to push the blocks into the block manager.
+ */
+class BufferingBlockCreator[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel)
+ extends Serializable with Logging {
+
+ case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+
+ val clock = new SystemClock()
+ val blockInterval = 200L
+ val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
+ val blockStorageLevel = storageLevel
+ val blocksForPushing = new ArrayBlockingQueue[Block](1000)
+ val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ var currentBuffer = new ArrayBuffer[T]
+
+ def start() {
+ blockIntervalTimer.start()
+ blockPushingThread.start()
+ logInfo("Data handler started")
+ }
+
+ def stop() {
+ blockIntervalTimer.stop()
+ blockPushingThread.interrupt()
+ logInfo("Data handler stopped")
+ }
+
+ def += (obj: T) {
+ currentBuffer += obj
+ }
+
+ private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
+ new Block(blockId, iterator)
+ }
+
+ private def updateCurrentBuffer(time: Long) {
+ try {
+ val newBlockBuffer = currentBuffer
+ currentBuffer = new ArrayBuffer[T]
+ if (newBlockBuffer.size > 0) {
+ val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval)
+ val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+ blocksForPushing.add(newBlock)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block interval timer thread interrupted")
+ case e: Exception =>
+ receiver.stop()
+ }
+ }
+
+ private def keepPushingBlocks() {
+ logInfo("Block pushing thread started")
+ try {
+ while(true) {
+ val block = blocksForPushing.take()
+ receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Block pushing thread interrupted")
+ case e: Exception =>
+ receiver.stop()
+ }
+ }
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/DataHandler.scala b/streaming/src/main/scala/spark/streaming/DataHandler.scala
deleted file mode 100644
index 05f307a8d1..0000000000
--- a/streaming/src/main/scala/spark/streaming/DataHandler.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package spark.streaming
-
-import java.util.concurrent.ArrayBlockingQueue
-import scala.collection.mutable.ArrayBuffer
-import spark.Logging
-import spark.streaming.util.{RecurringTimer, SystemClock}
-import spark.storage.StorageLevel
-
-
-/**
- * This is a helper object that manages the data received from the socket. It divides
- * the object received into small batches of 100s of milliseconds, pushes them as
- * blocks into the block manager and reports the block IDs to the network input
- * tracker. It starts two threads, one to periodically start a new batch and prepare
- * the previous batch of as a block, the other to push the blocks into the block
- * manager.
- */
- class DataHandler[T](receiver: NetworkReceiver[T], storageLevel: StorageLevel)
- extends Serializable with Logging {
-
- case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
-
- val clock = new SystemClock()
- val blockInterval = 200L
- val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
- val blockStorageLevel = storageLevel
- val blocksForPushing = new ArrayBlockingQueue[Block](1000)
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- var currentBuffer = new ArrayBuffer[T]
-
- def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
- new Block(blockId, iterator)
- }
-
- def start() {
- blockIntervalTimer.start()
- blockPushingThread.start()
- logInfo("Data handler started")
- }
-
- def stop() {
- blockIntervalTimer.stop()
- blockPushingThread.interrupt()
- logInfo("Data handler stopped")
- }
-
- def += (obj: T) {
- currentBuffer += obj
- }
-
- def updateCurrentBuffer(time: Long) {
- try {
- val newBlockBuffer = currentBuffer
- currentBuffer = new ArrayBuffer[T]
- if (newBlockBuffer.size > 0) {
- val blockId = "input-" + receiver.streamId + "- " + (time - blockInterval)
- val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
- blocksForPushing.add(newBlock)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block interval timer thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
-
- def keepPushingBlocks() {
- logInfo("Block pushing thread started")
- try {
- while(true) {
- val block = blocksForPushing.take()
- receiver.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
- }
- } catch {
- case ie: InterruptedException =>
- logInfo("Block pushing thread interrupted")
- case e: Exception =>
- receiver.stop()
- }
- }
- } \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
index 2959ce4540..02d9811669 100644
--- a/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FlumeInputDStream.scala
@@ -110,7 +110,7 @@ class FlumeReceiver(
storageLevel: StorageLevel
) extends NetworkReceiver[SparkFlumeEvent](streamId) {
- lazy val dataHandler = new DataHandler(this, storageLevel)
+ lazy val dataHandler = new BufferingBlockCreator(this, storageLevel)
protected override def onStart() {
val responder = new SpecificResponder(
diff --git a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
index a9e37c0ff0..f7a34d2515 100644
--- a/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/SocketInputDStream.scala
@@ -32,7 +32,7 @@ class SocketReceiver[T: ClassManifest](
storageLevel: StorageLevel
) extends NetworkReceiver[T](streamId) {
- lazy protected val dataHandler = new DataHandler(this, storageLevel)
+ lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel)
override def getLocationPreference = None
diff --git a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
index 7c642d4802..66f60519bc 100644
--- a/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/input/KafkaInputDStream.scala
@@ -102,7 +102,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
val ZK_TIMEOUT = 10000
// Handles pushing data into the BlockManager
- lazy protected val dataHandler = new DataHandler(this, storageLevel)
+ lazy protected val dataHandler = new BufferingBlockCreator(this, storageLevel)
// Keeps track of the current offsets. Maps from (broker, topic, group, part) -> Offset
lazy val offsets = HashMap[KafkaPartitionKey, Long]()
// Connection to Kafka
@@ -114,7 +114,6 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
def onStart() {
- // Starting the DataHandler that buffers blocks and pushes them into them BlockManager
dataHandler.start()
// In case we are using multiple Threads to handle Kafka Messages
@@ -181,7 +180,7 @@ class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
// NOT USED - Originally intended for fault-tolerance
// class KafkaDataHandler(receiver: KafkaReceiver, storageLevel: StorageLevel)
- // extends DataHandler[Any](receiver, storageLevel) {
+ // extends BufferingBlockCreator[Any](receiver, storageLevel) {
// override def createBlock(blockId: String, iterator: Iterator[Any]) : Block = {
// // Creates a new Block with Kafka-specific Metadata