aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-17 20:00:56 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-17 20:00:56 -0700
commit6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109 (patch)
tree715e352a7e88818d315c92e8ae5c23e9a26b90ab /streaming/src
parent6037ed0a1d7ecbb77140ddf4d0192a1dc60316bb (diff)
downloadspark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.tar.gz
spark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.tar.bz2
spark-6c2d894a2f8f7a29ec6fc8163e41c24bb70c3109.zip
[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer. This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted. This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet). Author: Josh Rosen <joshrosen@databricks.com> Closes #11748 from JoshRosen/chunked-block-serialization.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala2
4 files changed, 15 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
index f811784b25..8625882b04 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala
@@ -28,11 +28,13 @@ import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.util._
import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.io.ChunkedByteBuffer
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the corresponding record handle in the write ahead log that backs the partition.
+ *
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
@@ -59,7 +61,6 @@ class WriteAheadLogBackedBlockRDDPartition(
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
- *
* @param sc SparkContext
* @param _blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
@@ -156,7 +157,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
- blockManager.putBytes(blockId, dataRead, storageLevel)
+ blockManager.putBytes(blockId, new ChunkedByteBuffer(dataRead.duplicate()), storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 4880884b05..6d4f4b99c1 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -30,6 +30,7 @@ import org.apache.spark.storage._
import org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler._
import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
+import org.apache.spark.util.io.ChunkedByteBuffer
/** Trait that represents the metadata related to storage of blocks */
private[streaming] trait ReceivedBlockStoreResult {
@@ -84,7 +85,8 @@ private[streaming] class BlockManagerBasedBlockHandler(
numRecords = countIterator.count
putResult
case ByteBufferBlock(byteBuffer) =>
- blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)
+ blockManager.putBytes(
+ blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)
case o =>
throw new SparkException(
s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")
@@ -178,15 +180,18 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
numRecords = countIterator.count
serializedBlock
case ByteBufferBlock(byteBuffer) =>
- byteBuffer
+ new ChunkedByteBuffer(byteBuffer.duplicate())
case _ =>
throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
}
// Store the block in block manager
val storeInBlockManagerFuture = Future {
- val putSucceeded =
- blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
+ val putSucceeded = blockManager.putBytes(
+ blockId,
+ serializedBlock,
+ effectiveStorageLevel,
+ tellMaster = true)
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
@@ -195,7 +200,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Store the block in write ahead log
val storeInWriteAheadLogFuture = Future {
- writeAheadLog.write(serializedBlock, clock.getTimeMillis())
+ writeAheadLog.write(serializedBlock.toByteBuffer, clock.getTimeMillis())
}
// Combine the futures, wait for both to complete, and return the write ahead log record handle
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 2d509af85a..76f67ed601 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -339,7 +339,7 @@ class ReceivedBlockHandlerSuite
storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) })
storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) })
- storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) })
+ storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b).toByteBuffer) })
}
/** Test error handling when blocks that cannot be stored */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
index 79ac833c18..c4bf42d0f2 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala
@@ -223,7 +223,7 @@ class WriteAheadLogBackedBlockRDDSuite
require(blockData.size === blockIds.size)
val writer = new FileBasedWriteAheadLogWriter(new File(dir, "logFile").toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
- writer.write(blockManager.dataSerialize(id, data.iterator))
+ writer.write(blockManager.dataSerialize(id, data.iterator).toByteBuffer)
}
writer.close()
segments