aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com>2015-06-18 19:58:47 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-18 20:00:05 -0700
commit3eaed8769c16e887edb9d54f5816b4ee6da23de5 (patch)
tree4080eeb6aa37665267dc9773835316ef5c424964 /streaming/src
parent4ce3bab89f6bdf6208fdad2fbfaba0b53d1954e3 (diff)
downloadspark-3eaed8769c16e887edb9d54f5816b4ee6da23de5.tar.gz
spark-3eaed8769c16e887edb9d54f5816b4ee6da23de5.tar.bz2
spark-3eaed8769c16e887edb9d54f5816b4ee6da23de5.zip
[SPARK-8080] [STREAMING] Receiver.store with Iterator does not give correct count at Spark UI
tdas zsxwing this is the new PR for Spark-8080 I have merged https://github.com/apache/spark/pull/6659 Also to mention , for MEMORY_ONLY settings , when Block is not able to unrollSafely to memory if enough space is not there, BlockManager won't try to put the block and ReceivedBlockHandler will throw SparkException as it could not find the block id in PutResult. Thus number of records in block won't be counted if Block failed to unroll in memory. Which is fine. For MEMORY_DISK settings , if BlockManager not able to unroll block to memory, block will still get deseralized to Disk. Same for WAL based store. So for those cases ( storage level = memory + disk ) number of records will be counted even though the block not able to unroll to memory. thus I added the isFullyConsumed in the CountingIterator but have not used it as such case will never happen that block not fully consumed and ReceivedBlockHandler still get the block ID. I have added few test cases to cover those block unrolling scenarios also. Author: Dibyendu Bhattacharya <dibyendu.bhattacharya1@pearson.com> Author: U-PEROOT\UBHATD1 <UBHATD1@PIN-L-PI046.PEROOT.com> Closes #6707 from dibbhatt/master and squashes the following commits: f6cb6b5 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI f37cfd8 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 5a8344a [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Count ByteBufferBlock as 1 count fceac72 [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 0153e7e [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI Fixed comments given by @zsxwing 4c5931d [Dibyendu Bhattacharya] [SPARK-8080][STREAMING] Receiver.store with Iterator does not give correct count at Spark UI 01e6dc8 [U-PEROOT\UBHATD1] A
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala53
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala154
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala2
4 files changed, 194 insertions, 22 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 207d64d941..c8dd6e0681 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -32,7 +32,10 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
- def blockId: StreamBlockId // Any implementation of this trait will store a block id
+ // Any implementation of this trait will store a block id
+ def blockId: StreamBlockId
+ // Any implementation of this trait will have to return the number of records
+ def numRecords: Option[Long]
}
/** Trait that represents a class that handles the storage of blocks received by receiver */
@@ -51,7 +54,8 @@ private[streaming] trait ReceivedBlockHandler {
* that stores the metadata related to storage of blocks using
* [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]]
*/
-private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId)
+private[streaming] case class BlockManagerBasedStoreResult(
+ blockId: StreamBlockId, numRecords: Option[Long])
extends ReceivedBlockStoreResult
@@ -64,11 +68,20 @@ private[streaming] class BlockManagerBasedBlockHandler(
extends ReceivedBlockHandler with Logging {
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+
+ var numRecords = None: Option[Long]
+
val putResult: Seq[(BlockId, BlockStatus)] = block match {
case ArrayBufferBlock(arrayBuffer) =>
- blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)
+ numRecords = Some(arrayBuffer.size.toLong)
+ blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,
+ tellMaster = true)
case IteratorBlock(iterator) =>
- blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
+ val countIterator = new CountingIterator(iterator)
+ val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,
+ tellMaster = true)
+ numRecords = countIterator.count
+ putResult
case ByteBufferBlock(byteBuffer) =>
blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
case o =>
@@ -79,7 +92,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
}
- BlockManagerBasedStoreResult(blockId)
+ BlockManagerBasedStoreResult(blockId, numRecords)
}
def cleanupOldBlocks(threshTime: Long) {
@@ -96,6 +109,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
+ numRecords: Option[Long],
walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult
@@ -151,12 +165,17 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
*/
def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
+ var numRecords = None: Option[Long]
// Serialize the block so that it can be inserted into both
val serializedBlock = block match {
case ArrayBufferBlock(arrayBuffer) =>
+ numRecords = Some(arrayBuffer.size.toLong)
blockManager.dataSerialize(blockId, arrayBuffer.iterator)
case IteratorBlock(iterator) =>
- blockManager.dataSerialize(blockId, iterator)
+ val countIterator = new CountingIterator(iterator)
+ val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
+ numRecords = countIterator.count
+ serializedBlock
case ByteBufferBlock(byteBuffer) =>
byteBuffer
case _ =>
@@ -181,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
- WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
+ WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
}
def cleanupOldBlocks(threshTime: Long) {
@@ -199,3 +218,23 @@ private[streaming] object WriteAheadLogBasedBlockHandler {
new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString
}
}
+
+/**
+ * A utility that will wrap the Iterator to get the count
+ */
+private class CountingIterator[T](iterator: Iterator[T]) extends Iterator[T] {
+ private var _count = 0
+
+ private def isFullyConsumed: Boolean = !iterator.hasNext
+
+ def hasNext(): Boolean = iterator.hasNext
+
+ def count(): Option[Long] = {
+ if (isFullyConsumed) Some(_count) else None
+ }
+
+ def next(): T = {
+ _count += 1
+ iterator.next()
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 8be732b64e..6078cdf8f8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -137,15 +137,10 @@ private[streaming] class ReceiverSupervisorImpl(
blockIdOption: Option[StreamBlockId]
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
- val numRecords = receivedBlock match {
- case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
- case _ => None
- }
-
val time = System.currentTimeMillis
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
-
+ val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index cca8cedb1d..6c0c926755 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -49,7 +49,6 @@ class ReceivedBlockHandlerSuite
val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs", "1")
val hadoopConf = new Configuration()
- val storageLevel = StorageLevel.MEMORY_ONLY_SER
val streamId = 1
val securityMgr = new SecurityManager(conf)
val mapOutputTracker = new MapOutputTrackerMaster(conf)
@@ -57,10 +56,12 @@ class ReceivedBlockHandlerSuite
val serializer = new KryoSerializer(conf)
val manualClock = new ManualClock
val blockManagerSize = 10000000
+ val blockManagerBuffer = new ArrayBuffer[BlockManager]()
var rpcEnv: RpcEnv = null
var blockManagerMaster: BlockManagerMaster = null
var blockManager: BlockManager = null
+ var storageLevel: StorageLevel = null
var tempDirectory: File = null
before {
@@ -70,20 +71,21 @@ class ReceivedBlockHandlerSuite
blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
- blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
- blockManagerSize, conf, mapOutputTracker, shuffleManager,
- new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
- blockManager.initialize("app-id")
+ storageLevel = StorageLevel.MEMORY_ONLY_SER
+ blockManager = createBlockManager(blockManagerSize, conf)
tempDirectory = Utils.createTempDir()
manualClock.setTime(0)
}
after {
- if (blockManager != null) {
- blockManager.stop()
- blockManager = null
+ for ( blockManager <- blockManagerBuffer ) {
+ if (blockManager != null) {
+ blockManager.stop()
+ }
}
+ blockManager = null
+ blockManagerBuffer.clear()
if (blockManagerMaster != null) {
blockManagerMaster.stop()
blockManagerMaster = null
@@ -174,6 +176,130 @@ class ReceivedBlockHandlerSuite
}
}
+ test("Test Block - count messages") {
+ // Test count with BlockManagedBasedBlockHandler
+ testCountWithBlockManagerBasedBlockHandler(true)
+ // Test count with WriteAheadLogBasedBlockHandler
+ testCountWithBlockManagerBasedBlockHandler(false)
+ }
+
+ test("Test Block - isFullyConsumed") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
+ // spark.storage.unrollFraction set to 0.4 for BlockManager
+ sparkConf.set("spark.storage.unrollFraction", "0.4")
+ // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
+ blockManager = createBlockManager(12000, sparkConf)
+
+ // there is not enough space to store this block in MEMORY,
+ // But BlockManager will be able to sereliaze this block to WAL
+ // and hence count returns correct value.
+ testRecordcount(false, StorageLevel.MEMORY_ONLY,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+ // there is not enough space to store this block in MEMORY,
+ // But BlockManager will be able to sereliaze this block to DISK
+ // and hence count returns correct value.
+ testRecordcount(true, StorageLevel.MEMORY_AND_DISK,
+ IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator), blockManager, Some(70))
+
+ // there is not enough space to store this block With MEMORY_ONLY StorageLevel.
+ // BlockManager will not be able to unroll this block
+ // and hence it will not tryToPut this block, resulting the SparkException
+ storageLevel = StorageLevel.MEMORY_ONLY
+ withBlockManagerBasedBlockHandler { handler =>
+ val thrown = intercept[SparkException] {
+ storeSingleBlock(handler, IteratorBlock((List.fill(70)(new Array[Byte](100))).iterator))
+ }
+ }
+ }
+
+ private def testCountWithBlockManagerBasedBlockHandler(isBlockManagerBasedBlockHandler: Boolean) {
+ // ByteBufferBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+ // ByteBufferBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ ByteBufferBlock(ByteBuffer.wrap(Array.tabulate(100)(i => i.toByte))), blockManager, None)
+ // ArrayBufferBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+ // ArrayBufferBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ ArrayBufferBlock(ArrayBuffer.fill(25)(0)), blockManager, Some(25))
+ // ArrayBufferBlock-DISK_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+ ArrayBufferBlock(ArrayBuffer.fill(50)(0)), blockManager, Some(50))
+ // ArrayBufferBlock-MEMORY_AND_DISK
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+ ArrayBufferBlock(ArrayBuffer.fill(75)(0)), blockManager, Some(75))
+ // IteratorBlock-MEMORY_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY,
+ IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+ // IteratorBlock-MEMORY_ONLY_SER
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_ONLY_SER,
+ IteratorBlock((ArrayBuffer.fill(100)(0)).iterator), blockManager, Some(100))
+ // IteratorBlock-DISK_ONLY
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.DISK_ONLY,
+ IteratorBlock((ArrayBuffer.fill(125)(0)).iterator), blockManager, Some(125))
+ // IteratorBlock-MEMORY_AND_DISK
+ testRecordcount(isBlockManagerBasedBlockHandler, StorageLevel.MEMORY_AND_DISK,
+ IteratorBlock((ArrayBuffer.fill(150)(0)).iterator), blockManager, Some(150))
+ }
+
+ private def createBlockManager(
+ maxMem: Long,
+ conf: SparkConf,
+ name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+ val transfer = new NioBlockTransferService(conf, securityMgr)
+ val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
+ mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ manager.initialize("app-id")
+ blockManagerBuffer += manager
+ manager
+ }
+
+ /**
+ * Test storing of data using different types of Handler, StorageLevle and ReceivedBlocks
+ * and verify the correct record count
+ */
+ private def testRecordcount(isBlockManagedBasedBlockHandler: Boolean,
+ sLevel: StorageLevel,
+ receivedBlock: ReceivedBlock,
+ bManager: BlockManager,
+ expectedNumRecords: Option[Long]
+ ) {
+ blockManager = bManager
+ storageLevel = sLevel
+ var bId: StreamBlockId = null
+ try {
+ if (isBlockManagedBasedBlockHandler) {
+ // test received block with BlockManager based handler
+ withBlockManagerBasedBlockHandler { handler =>
+ val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+ bId = blockId
+ assert(blockStoreResult.numRecords === expectedNumRecords,
+ "Message count not matches for a " +
+ receivedBlock.getClass.getName +
+ " being inserted using BlockManagerBasedBlockHandler with " + sLevel)
+ }
+ } else {
+ // test received block with WAL based handler
+ withWriteAheadLogBasedBlockHandler { handler =>
+ val (blockId, blockStoreResult) = storeSingleBlock(handler, receivedBlock)
+ bId = blockId
+ assert(blockStoreResult.numRecords === expectedNumRecords,
+ "Message count not matches for a " +
+ receivedBlock.getClass.getName +
+ " being inserted using WriteAheadLogBasedBlockHandler with " + sLevel)
+ }
+ }
+ } finally {
+ // Removing the Block Id to use same blockManager for next test
+ blockManager.removeBlock(bId, true)
+ }
+ }
+
/**
* Test storing of data using different forms of ReceivedBlocks and verify that they succeeded
* using the given verification function
@@ -251,9 +377,21 @@ class ReceivedBlockHandlerSuite
(blockIds, storeResults)
}
+ /** Store single block using a handler */
+ private def storeSingleBlock(
+ handler: ReceivedBlockHandler,
+ block: ReceivedBlock
+ ): (StreamBlockId, ReceivedBlockStoreResult) = {
+ val blockId = generateBlockId
+ val blockStoreResult = handler.storeBlock(blockId, block)
+ logDebug("Done inserting")
+ (blockId, blockStoreResult)
+ }
+
private def getWriteAheadLogFiles(): Seq[String] = {
getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId))
}
private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong)
}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index be305b5e0d..f793a12843 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -225,7 +225,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
- BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
+ BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)), Some(0L))))
}
/** Get all the data written in the given write ahead log file. */