diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-07-05 21:38:21 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-07-05 21:38:21 -0700 |
commit | 280418ac452b029b15a83d8e2fe05a96417294d1 (patch) | |
tree | fd1f80a3e1d29822e194a1094687d69161659986 /streaming | |
parent | 6ad85d0918460188172ffc1b23b3a2035f13dbcb (diff) | |
download | spark-280418ac452b029b15a83d8e2fe05a96417294d1.tar.gz spark-280418ac452b029b15a83d8e2fe05a96417294d1.tar.bz2 spark-280418ac452b029b15a83d8e2fe05a96417294d1.zip |
Reduced the number of Iterator to ArrayBuffer copies in NetworkReceiver.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala | 18 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala | 7 |
2 files changed, 11 insertions, 14 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 26805e9621..122a529bb7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -140,12 +140,10 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log /** - * Pushes a block (as iterator of values) into the block manager. + * Pushes a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { - val buffer = new ArrayBuffer[T] ++ iterator - env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level) - + def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) actor ! ReportBlock(blockId, metadata) } @@ -195,7 +193,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: String, iterator: Iterator[T], metadata: Any = null) + case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong @@ -222,17 +220,13 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log currentBuffer += obj } - private def createBlock(blockId: String, iterator: Iterator[T]) : Block = { - new Block(blockId, iterator) - } - private def updateCurrentBuffer(time: Long) { try { val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) - val newBlock = createBlock(blockId, newBlockBuffer.toIterator) + val newBlock = new Block(blockId, newBlockBuffer) blocksForPushing.add(newBlock) } } catch { @@ -248,7 +242,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log try { while(true) { val block = blocksForPushing.take() - NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel) + NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel) } } catch { case ie: InterruptedException => diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala index b3201d0b28..036c95a860 100644 --- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala +++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala @@ -9,6 +9,8 @@ import spark.streaming.dstream.NetworkReceiver import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer + /** A helper with set of defaults for supervisor strategy **/ object ReceiverSupervisorStrategy { @@ -136,8 +138,9 @@ private[streaming] class ActorReceiver[T: ClassManifest]( } protected def pushBlock(iter: Iterator[T]) { - pushBlock("block-" + streamId + "-" + System.nanoTime(), - iter, null, storageLevel) + val buffer = new ArrayBuffer[T] + buffer ++= iter + pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel) } protected def onStart() = { |