aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala171
1 files changed, 118 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 3ca41f32c1..99be4de065 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
+import org.apache.spark.unsafe.Platform
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
-import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
+import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
private sealed trait MemoryEntry[T] {
def size: Long
+ def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
private case class DeserializedMemoryEntry[T](
value: Array[T],
size: Long,
- classTag: ClassTag[T]) extends MemoryEntry[T]
+ classTag: ClassTag[T]) extends MemoryEntry[T] {
+ val memoryMode: MemoryMode = MemoryMode.ON_HEAP
+}
private case class SerializedMemoryEntry[T](
buffer: ChunkedByteBuffer,
+ memoryMode: MemoryMode,
classTag: ClassTag[T]) extends MemoryEntry[T] {
def size: Long = buffer.size
}
@@ -86,7 +91,10 @@ private[spark] class MemoryStore(
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
- private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+ private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
+ // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching
+ // always stores serialized values.
+ private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
// Initial memory to request before unrolling any block
private val unrollMemoryThreshold: Long =
@@ -131,13 +139,14 @@ private[spark] class MemoryStore(
def putBytes[T: ClassTag](
blockId: BlockId,
size: Long,
+ memoryMode: MemoryMode,
_bytes: () => ChunkedByteBuffer): Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
- if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) {
+ if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
// We acquired enough memory for the block, so go ahead and put it
val bytes = _bytes()
assert(bytes.size == size)
- val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
+ val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
entries.synchronized {
entries.put(blockId, entry)
}
@@ -190,7 +199,8 @@ private[spark] class MemoryStore(
var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+ keepUnrolling =
+ reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -207,7 +217,8 @@ private[spark] class MemoryStore(
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ keepUnrolling =
+ reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@@ -228,7 +239,7 @@ private[spark] class MemoryStore(
def transferUnrollToStorage(amount: Long): Unit = {
// Synchronize so that transfer is atomic
memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(amount)
+ releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
assert(success, "transferring unroll memory to storage memory failed")
}
@@ -247,7 +258,7 @@ private[spark] class MemoryStore(
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
- releaseUnrollMemoryForThisTask(excessUnrollMemory)
+ releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
@@ -295,10 +306,16 @@ private[spark] class MemoryStore(
private[storage] def putIteratorAsBytes[T](
blockId: BlockId,
values: Iterator[T],
- classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
+ classTag: ClassTag[T],
+ memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
+ val allocator = memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Initial per-task memory to request for unrolling blocks (bytes).
@@ -307,15 +324,15 @@ private[spark] class MemoryStore(
var unrollMemoryUsedByThisBlock = 0L
// Underlying buffer for unrolling the block
val redirectableStream = new RedirectableOutputStream
- val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
- redirectableStream.setOutputStream(byteArrayChunkOutputStream)
+ val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
+ redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val ser = serializerManager.getSerializer(classTag).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// Request enough memory to begin unrolling
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@@ -325,9 +342,9 @@ private[spark] class MemoryStore(
}
def reserveAdditionalMemoryIfNecessary(): Unit = {
- if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
- val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
- keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
+ if (bbos.size > unrollMemoryUsedByThisBlock) {
+ val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
+ keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@@ -349,12 +366,11 @@ private[spark] class MemoryStore(
}
if (keepUnrolling) {
- val entry = SerializedMemoryEntry[T](
- new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag)
+ val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
// Synchronize so that transfer is atomic
memoryManager.synchronized {
- releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
- val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP)
+ releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
+ val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
@@ -365,7 +381,7 @@ private[spark] class MemoryStore(
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
- logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
+ logUnrollFailureMessage(blockId, bbos.size)
Left(
new PartiallySerializedBlock(
this,
@@ -374,7 +390,8 @@ private[spark] class MemoryStore(
serializationStream,
redirectableStream,
unrollMemoryUsedByThisBlock,
- new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
+ memoryMode,
+ bbos.toChunkedByteBuffer,
values,
classTag))
}
@@ -386,7 +403,7 @@ private[spark] class MemoryStore(
case null => None
case e: DeserializedMemoryEntry[_] =>
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
- case SerializedMemoryEntry(bytes, _) => Some(bytes)
+ case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
}
}
@@ -407,8 +424,12 @@ private[spark] class MemoryStore(
entries.remove(blockId)
}
if (entry != null) {
- memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
- logInfo(s"Block $blockId of size ${entry.size} dropped " +
+ entry match {
+ case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
+ case _ =>
+ }
+ memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
+ logDebug(s"Block $blockId of size ${entry.size} dropped " +
s"from memory (free ${maxMemory - blocksMemoryUsed})")
true
} else {
@@ -420,7 +441,8 @@ private[spark] class MemoryStore(
entries.synchronized {
entries.clear()
}
- unrollMemoryMap.clear()
+ onHeapUnrollMemoryMap.clear()
+ offHeapUnrollMemoryMap.clear()
memoryManager.releaseAllStorageMemory()
logInfo("MemoryStore cleared")
}
@@ -433,23 +455,27 @@ private[spark] class MemoryStore(
}
/**
- * Try to evict blocks to free up a given amount of space to store a particular block.
- * Can fail if either the block is bigger than our memory or it would require replacing
- * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
- * RDDs that don't fit into memory that we want to avoid).
- *
- * @param blockId the ID of the block we are freeing space for, if any
- * @param space the size of this block
- * @return the amount of memory (in bytes) freed by eviction
- */
- private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
+ * Try to evict blocks to free up a given amount of space to store a particular block.
+ * Can fail if either the block is bigger than our memory or it would require replacing
+ * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
+ * RDDs that don't fit into memory that we want to avoid).
+ *
+ * @param blockId the ID of the block we are freeing space for, if any
+ * @param space the size of this block
+ * @param memoryMode the type of memory to free (on- or off-heap)
+ * @return the amount of memory (in bytes) freed by eviction
+ */
+ private[spark] def evictBlocksToFreeSpace(
+ blockId: Option[BlockId],
+ space: Long,
+ memoryMode: MemoryMode): Long = {
assert(space > 0)
memoryManager.synchronized {
var freedMemory = 0L
val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
- def blockIsEvictable(blockId: BlockId): Boolean = {
- rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
+ def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
+ entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
}
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
@@ -459,7 +485,8 @@ private[spark] class MemoryStore(
while (freedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
- if (blockIsEvictable(blockId)) {
+ val entry = pair.getValue
+ if (blockIsEvictable(blockId, entry)) {
// We don't want to evict blocks which are currently being read, so we need to obtain
// an exclusive write lock on blocks which are candidates for eviction. We perform a
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
@@ -474,7 +501,7 @@ private[spark] class MemoryStore(
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
val data = entry match {
case DeserializedMemoryEntry(values, _, _) => Left(values)
- case SerializedMemoryEntry(buffer, _) => Right(buffer)
+ case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
}
val newEffectiveStorageLevel =
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
@@ -530,11 +557,18 @@ private[spark] class MemoryStore(
*
* @return whether the request is granted.
*/
- def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
+ def reserveUnrollMemoryForThisTask(
+ blockId: BlockId,
+ memory: Long,
+ memoryMode: MemoryMode): Boolean = {
memoryManager.synchronized {
- val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP)
+ val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
if (success) {
val taskAttemptId = currentTaskAttemptId()
+ val unrollMemoryMap = memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+ case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+ }
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
}
success
@@ -545,9 +579,13 @@ private[spark] class MemoryStore(
* Release memory used by this task for unrolling blocks.
* If the amount is not specified, remove the current task's allocation altogether.
*/
- def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
+ def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
memoryManager.synchronized {
+ val unrollMemoryMap = memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
+ case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
+ }
if (unrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -555,7 +593,7 @@ private[spark] class MemoryStore(
if (unrollMemoryMap(taskAttemptId) == 0) {
unrollMemoryMap.remove(taskAttemptId)
}
- memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP)
+ memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
}
}
}
@@ -565,20 +603,23 @@ private[spark] class MemoryStore(
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
def currentUnrollMemory: Long = memoryManager.synchronized {
- unrollMemoryMap.values.sum
+ onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
}
/**
* Return the amount of memory currently occupied for unrolling blocks by this task.
*/
def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
- unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
+ onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) +
+ offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
}
/**
* Return the number of tasks currently unrolling blocks.
*/
- private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
+ private def numTasksUnrolling: Int = memoryManager.synchronized {
+ (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size
+ }
/**
* Log information about current memory usage.
@@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T](
private[this] var iter: Iterator[T] = {
val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
unrolledIteratorIsConsumed = true
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
})
completionIterator ++ rest
}
@@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T](
*/
def close(): Unit = {
if (!unrolledIteratorIsConsumed) {
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
unrolledIteratorIsConsumed = true
}
iter = null
@@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream {
* @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
* @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
+ * @param memoryMode whether the unroll memory is on- or off-heap
* @param unrolled a byte buffer containing the partially-serialized values.
* @param rest the rest of the original iterator passed to
* [[MemoryStore.putIteratorAsValues()]].
@@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T](
serializationStream: SerializationStream,
redirectableOutputStream: RedirectableOutputStream,
unrollMemory: Long,
+ memoryMode: MemoryMode,
unrolled: ChunkedByteBuffer,
rest: Iterator[T],
classTag: ClassTag[T]) {
+ // If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of
+ // this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task
+ // completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
+ // The dispose() method is idempotent, so it's safe to call it unconditionally.
+ Option(TaskContext.get()).foreach { taskContext =>
+ taskContext.addTaskCompletionListener { _ =>
+ // When a task completes, its unroll memory will automatically be freed. Thus we do not call
+ // releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing.
+ unrolled.dispose()
+ }
+ }
+
/**
* Called to dispose of this block and free its memory.
*/
def discard(): Unit = {
try {
+ // We want to close the output stream in order to free any resources associated with the
+ // serializer itself (such as Kryo's internal buffers). close() might cause data to be
+ // written, so redirect the output stream to discard that data.
+ redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
serializationStream.close()
} finally {
- memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
+ unrolled.dispose()
+ memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
}
}
@@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T](
* and then serializing the values from the original input iterator.
*/
def finishWritingToStream(os: OutputStream): Unit = {
- ByteStreams.copy(unrolled.toInputStream(), os)
+ // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
+ ByteStreams.copy(unrolled.toInputStream(dispose = true), os)
+ memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
redirectableOutputStream.setOutputStream(os)
while (rest.hasNext) {
serializationStream.writeObject(rest.next())(classTag)
}
- discard()
+ serializationStream.close()
}
/**
@@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T](
* `close()` on it to free its resources.
*/
def valuesIterator: PartiallyUnrolledIterator[T] = {
+ // `unrolled`'s underlying buffers will be freed once this input stream is fully read:
+ val unrolledIter = serializerManager.dataDeserializeStream(
+ blockId, unrolled.toInputStream(dispose = true))(classTag)
new PartiallyUnrolledIterator(
memoryStore,
unrollMemory,
- unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag),
+ unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
rest = rest)
}
}