aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala53
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala154
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala2
4 files changed, 194 insertions, 22 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 207d64d941..c8dd6e0681 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
- def blockId: StreamBlockId // Any implementation of this trait will store a block id
+ // Any implementation of this trait will store a block id
+ def blockId: StreamBlockId
+ // Any implementation of this trait will have to return the number of records
+ def numRecords: Option[Long]
}
/** Trait that represents a class that handles the storage of blocks received by receiver */
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
*/
-private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
+private[streaming] case class BlockManagerBasedStoreResult(
+ blockId: StreamBlockId, numRecords: Option[Long])
extends ReceivedBlockStoreResult
@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+
+ var numRecords = None: Option[Long]
+
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
- blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
+ numRecords = Some(arrayBuffer.size.toLong)
+ blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
+ tellMaster = true)
case IteratorBlock(iterator) =>
- blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
+ val countIterator = new CountingIterator(iterator)
+ val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
+ tellMaster = true)
+ numRecords = countIterator.count
+ putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
- BlockManagerBasedStoreResult(blockId)
+ BlockManagerBasedStoreResult(blockId, numRecords)
}
def cleanupOldBlocks(threshTime: Long) {
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
+ numRecords: Option[Long],
walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult
@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
*/
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+ var numRecords = None: Option[Long]
// Serialize the block so that it can be inserted into both
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
+ numRecords = Some(arrayBuffer.size.toLong)
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
- blockManager.dataSerialize(blockId, iterator)
+ val countIterator = new CountingIterator(iterator)
+ val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
+ numRecords = countIterator.count
+ serializedBlock
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
- WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
+ WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
}
def cleanupOldBlocks(threshTime: Long) {
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
+ private var _count = 0
+
+ private def isFullyConsumed: Boolean = !iterator.hasNext
+
+ def hasNext(): Boolean = iterator.hasNext
+
+ def count(): Option[Long] = {
+ if (isFullyConsumed) Some(_count) else None
+ }
+
+ def next(): T = {
+ _count += 1
+ iterator.next()
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8be732b64e..6078cdf8f8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
- val numRecords = receivedBlock match {
- case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
- case _ => None
- }
-
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
-
+ val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index cca8cedb1d..6c0c926755 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -49,7 +49,6 @@ class ReceivedBlockHandlerSuite
val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val hadoopConf = new Configuration()
- val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -57,10 +56,12 @@ class ReceivedBlockHandlerSuite
val serializer = new KryoSerializer(conf)
val manualClock = new ManualClock
val blockManagerSize = 10000000
+ val blockManagerBuffer = new ArrayBuffer[BlockManager]()
var rpcEnv: RpcEnv = null
var blockManagerMaster: BlockManagerMaster = null
var blockManager: BlockManager = null
+ var storageLevel: StorageLevel = null
var tempDirectory: File = null
before {
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
- blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
- blockManagerSize, conf, mapOutputTracker, shuffleManager,
- new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
- blockManager.initialize("app-id")
+ storageLevel = StorageLevel.MEMORY_ONLY_SER
+ blockManager = createBlockManager(blockManagerSize, conf)
tempDirectory = Utils.createTempDir()
manualClock.setTime(0)
}
after {
- if (blockManager != null) {
- blockManager.stop()
- blockManager = null
+ for ( blockManager <- blockManagerBuffer ) {
+ if (blockManager != null) {
+ blockManager.stop()
+ }
}
+ blockManager = null
+ blockManagerBuffer.clear()
if (blockManagerMaster != null) {
blockManagerMaster.stop()
blockManagerMaster = null
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
}
}
+ test("Test Block - count messages") {
+ // Test count with BlockManagedBasedBlockHandler
+ testCountWithBlockManagerBasedBlockHandler(true)
+ // Test count with WriteAheadLogBasedBlockHandler
+ testCountWithBlockManagerBasedBlockHandler(false)
+ }
+
+ test("Test Block - isFullyConsumed") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+ // spark.storage.unrollFraction set to 0.4 for BlockManager
+ sparkConf.set("spark.storage.unrollFraction", "0.4")
+ // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+ blockManager = createBlockManager(12000, sparkConf)
+
+ // there is not enough space to store this block in MEMORY,
+ // But BlockManager will be able to sereliaze this block to WAL
+ // and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+ // there is not enough space to store this block in MEMORY,
+ // But BlockManager will be able to sereliaze this block to DISK
+ // and hence count returns correct value.
+ testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+ // there is not enough space to store this block With MEMORY_ONLY StorageLevel.
+ // BlockManager will not be able to unroll this block
+ // and hence it will not tryToPut this block, resulting the SparkException
+ storageLevel = StorageLevel.MEMORY_ONLY
+ withBlockManagerBasedBlockHandler { handler =>
+ val thrown = intercept[SparkException] {
+ storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
+ }
+ }
+ }
+
+ private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
+ // ByteBufferBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+ // ByteBufferBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+ // ArrayBufferBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+ // ArrayBufferBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+ // ArrayBufferBlock-DISK_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+ ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+ // ArrayBufferBlock-MEMORY_AND_DISK
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+ ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+ // IteratorBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+ // IteratorBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+ // IteratorBlock-DISK_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+ IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
+ // IteratorBlock-MEMORY_AND_DISK
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+ IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
+ }
+
+ private def createBlockManager(
+ maxMem: Long,
+ conf: SparkConf,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+ val transfer = new NioBlockTransferService(conf, securityMgr)
+ val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ manager.initialize("app-id")
+ blockManagerBuffer += manager
+ manager
+ }
+
+ /**
+ * Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
+ * and verify the correct record count
+ */
+ private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
+ sLevel: StorageLevel,
+ receivedBlock: ReceivedBlock,
+ bManager: BlockManager,
+ expectedNumRecords: Option[Long]
+ ) {
+ blockManager = bManager
+ storageLevel = sLevel
+ var bId: StreamBlockId = null
+ try {
+ if (isBlockManagedBasedBlockHandler) {
+ // test received block with BlockManager based handler
+ withBlockManagerBasedBlockHandler { handler =>
+ val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+ bId = blockId
+ assert(blockStoreResult.numRecords === expectedNumRecords,
+ "Message count not matches for a " +
+ receivedBlock.getClass.getName +
+ " being inserted using BlockManagerBasedBlockHandler with " + sLevel)
+ }
+ } else {
+ // test received block with WAL based handler
+ withWriteAheadLogBasedBlockHandler { handler =>
+ val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+ bId = blockId
+ assert(blockStoreResult.numRecords === expectedNumRecords,
+ "Message count not matches for a " +
+ receivedBlock.getClass.getName +
+ " being inserted using WriteAheadLogBasedBlockHandler with " + sLevel)
+ }
+ }
+ } finally {
+ // Removing the Block Id to use same blockManager for next test
+ blockManager.removeBlock(bId, true)
+ }
+ }
+
/**
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
* using the given verification function
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
(blockIds, storeResults)
}
+ /** Store single block using a handler */
+ private def storeSingleBlock(
+ handler: ReceivedBlockHandler,
+ block: ReceivedBlock
+ ): (StreamBlockId, ReceivedBlockStoreResult) = {
+ val blockId = generateBlockId
+ val blockStoreResult = handler.storeBlock(blockId, block)
+ logDebug("Done inserting")
+ (blockId, blockStoreResult)
+ }
+
private def getWriteAheadLogFiles(): Seq[String] = {
getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
}
private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index be305b5e0d..f793a12843 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
- BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
+ BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}
/** Get all the data written in the given write ahead log file. */