aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-07-05 21:38:21 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2013-07-05 21:38:21 -0700
commit280418ac452b029b15a83d8e2fe05a96417294d1 (patch)
treefd1f80a3e1d29822e194a1094687d69161659986 /streaming
parent6ad85d0918460188172ffc1b23b3a2035f13dbcb (diff)
downloadspark-280418ac452b029b15a83d8e2fe05a96417294d1.tar.gz
spark-280418ac452b029b15a83d8e2fe05a96417294d1.tar.bz2
spark-280418ac452b029b15a83d8e2fe05a96417294d1.zip
Reduced the number of Iterator to ArrayBuffer copies in NetworkReceiver.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala18
-rw-r--r--streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala7
2 files changed, 11 insertions, 14 deletions
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 26805e9621..122a529bb7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -140,12 +140,10 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
/**
- * Pushes a block (as iterator of values) into the block manager.
+ * Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
- def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
- val buffer = new ArrayBuffer[T] ++ iterator
- env.blockManager.put(blockId, buffer.asInstanceOf[ArrayBuffer[Any]], level)
-
+ def pushBlock(blockId: String, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
+ env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
}
@@ -195,7 +193,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {
- case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
+ case class Block(id: String, buffer: ArrayBuffer[T], metadata: Any = null)
val clock = new SystemClock()
val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
@@ -222,17 +220,13 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
currentBuffer += obj
}
- private def createBlock(blockId: String, iterator: Iterator[T]) : Block = {
- new Block(blockId, iterator)
- }
-
private def updateCurrentBuffer(time: Long) {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
- val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
+ val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.add(newBlock)
}
} catch {
@@ -248,7 +242,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
try {
while(true) {
val block = blocksForPushing.take()
- NetworkReceiver.this.pushBlock(block.id, block.iterator, block.metadata, storageLevel)
+ NetworkReceiver.this.pushBlock(block.id, block.buffer, block.metadata, storageLevel)
}
} catch {
case ie: InterruptedException =>
diff --git a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
index b3201d0b28..036c95a860 100644
--- a/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala
@@ -9,6 +9,8 @@ import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.ArrayBuffer
+
/** A helper with set of defaults for supervisor strategy **/
object ReceiverSupervisorStrategy {
@@ -136,8 +138,9 @@ private[streaming] class ActorReceiver[T: ClassManifest](
}
protected def pushBlock(iter: Iterator[T]) {
- pushBlock("block-" + streamId + "-" + System.nanoTime(),
- iter, null, storageLevel)
+ val buffer = new ArrayBuffer[T]
+ buffer ++= iter
+ pushBlock("block-" + streamId + "-" + System.nanoTime(), buffer, null, storageLevel)
}
protected def onStart() = {