diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 171 |
1 files changed, 118 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3ca41f32c1..99be4de065 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} +import org.apache.spark.unsafe.Platform import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector -import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} private sealed trait MemoryEntry[T] { def size: Long + def memoryMode: MemoryMode def classTag: ClassTag[T] } private case class DeserializedMemoryEntry[T]( value: Array[T], size: Long, - classTag: ClassTag[T]) extends MemoryEntry[T] + classTag: ClassTag[T]) extends MemoryEntry[T] { + val memoryMode: MemoryMode = MemoryMode.ON_HEAP +} private case class SerializedMemoryEntry[T]( buffer: ChunkedByteBuffer, + memoryMode: MemoryMode, classTag: ClassTag[T]) extends MemoryEntry[T] { def size: Long = buffer.size } @@ -86,7 +91,10 @@ private[spark] class MemoryStore( // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` - private val unrollMemoryMap = mutable.HashMap[Long, Long]() + private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() + // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching + // always stores serialized values. + private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = @@ -131,13 +139,14 @@ private[spark] class MemoryStore( def putBytes[T: ClassTag]( blockId: BlockId, size: Long, + memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) { + if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) - val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]]) + val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } @@ -190,7 +199,8 @@ private[spark] class MemoryStore( var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + keepUnrolling = + reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -207,7 +217,8 @@ private[spark] class MemoryStore( val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + keepUnrolling = + reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } @@ -228,7 +239,7 @@ private[spark] class MemoryStore( def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { - releaseUnrollMemoryForThisTask(amount) + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } @@ -247,7 +258,7 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(excessUnrollMemory) + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } @@ -295,10 +306,16 @@ private[spark] class MemoryStore( private[storage] def putIteratorAsBytes[T]( blockId: BlockId, values: Iterator[T], - classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = { + classTag: ClassTag[T], + memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + val allocator = memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). @@ -307,15 +324,15 @@ private[spark] class MemoryStore( var unrollMemoryUsedByThisBlock = 0L // Underlying buffer for unrolling the block val redirectableStream = new RedirectableOutputStream - val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) - redirectableStream.setOutputStream(byteArrayChunkOutputStream) + val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator) + redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { val ser = serializerManager.getSerializer(classTag).newInstance() ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -325,9 +342,9 @@ private[spark] class MemoryStore( } def reserveAdditionalMemoryIfNecessary(): Unit = { - if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (bbos.size > unrollMemoryUsedByThisBlock) { + val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } @@ -349,12 +366,11 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - val entry = SerializedMemoryEntry[T]( - new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag) + val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) // Synchronize so that transfer is atomic memoryManager.synchronized { - releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP) + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) assert(success, "transferring unroll memory to storage memory failed") } entries.synchronized { @@ -365,7 +381,7 @@ private[spark] class MemoryStore( Right(entry.size) } else { // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size) + logUnrollFailureMessage(blockId, bbos.size) Left( new PartiallySerializedBlock( this, @@ -374,7 +390,8 @@ private[spark] class MemoryStore( serializationStream, redirectableStream, unrollMemoryUsedByThisBlock, - new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), + memoryMode, + bbos.toChunkedByteBuffer, values, classTag)) } @@ -386,7 +403,7 @@ private[spark] class MemoryStore( case null => None case e: DeserializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getBytes on serialized blocks") - case SerializedMemoryEntry(bytes, _) => Some(bytes) + case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } } @@ -407,8 +424,12 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP) - logInfo(s"Block $blockId of size ${entry.size} dropped " + + entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case _ => + } + memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) + logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true } else { @@ -420,7 +441,8 @@ private[spark] class MemoryStore( entries.synchronized { entries.clear() } - unrollMemoryMap.clear() + onHeapUnrollMemoryMap.clear() + offHeapUnrollMemoryMap.clear() memoryManager.releaseAllStorageMemory() logInfo("MemoryStore cleared") } @@ -433,23 +455,27 @@ private[spark] class MemoryStore( } /** - * Try to evict blocks to free up a given amount of space to store a particular block. - * Can fail if either the block is bigger than our memory or it would require replacing - * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for - * RDDs that don't fit into memory that we want to avoid). - * - * @param blockId the ID of the block we are freeing space for, if any - * @param space the size of this block - * @return the amount of memory (in bytes) freed by eviction - */ - private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = { + * Try to evict blocks to free up a given amount of space to store a particular block. + * Can fail if either the block is bigger than our memory or it would require replacing + * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for + * RDDs that don't fit into memory that we want to avoid). + * + * @param blockId the ID of the block we are freeing space for, if any + * @param space the size of this block + * @param memoryMode the type of memory to free (on- or off-heap) + * @return the amount of memory (in bytes) freed by eviction + */ + private[spark] def evictBlocksToFreeSpace( + blockId: Option[BlockId], + space: Long, + memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] - def blockIsEvictable(blockId: BlockId): Boolean = { - rddToAdd.isEmpty || rddToAdd != getRddId(blockId) + def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { + entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that @@ -459,7 +485,8 @@ private[spark] class MemoryStore( while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (blockIsEvictable(blockId)) { + val entry = pair.getValue + if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: @@ -474,7 +501,7 @@ private[spark] class MemoryStore( def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) - case SerializedMemoryEntry(buffer, _) => Right(buffer) + case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) @@ -530,11 +557,18 @@ private[spark] class MemoryStore( * * @return whether the request is granted. */ - def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { + def reserveUnrollMemoryForThisTask( + blockId: BlockId, + memory: Long, + memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP) + val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) if (success) { val taskAttemptId = currentTaskAttemptId() + val unrollMemoryMap = memoryMode match { + case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap + case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap + } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success @@ -545,9 +579,13 @@ private[spark] class MemoryStore( * Release memory used by this task for unrolling blocks. * If the amount is not specified, remove the current task's allocation altogether. */ - def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { + def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { + val unrollMemoryMap = memoryMode match { + case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap + case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap + } if (unrollMemoryMap.contains(taskAttemptId)) { val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { @@ -555,7 +593,7 @@ private[spark] class MemoryStore( if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) } - memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP) + memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) } } } @@ -565,20 +603,23 @@ private[spark] class MemoryStore( * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ def currentUnrollMemory: Long = memoryManager.synchronized { - unrollMemoryMap.values.sum + onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum } /** * Return the amount of memory currently occupied for unrolling blocks by this task. */ def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized { - unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + + offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) } /** * Return the number of tasks currently unrolling blocks. */ - private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size } + private def numTasksUnrolling: Int = memoryManager.synchronized { + (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size + } /** * Log information about current memory usage. @@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T]( private[this] var iter: Iterator[T] = { val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, { unrolledIteratorIsConsumed = true - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) }) completionIterator ++ rest } @@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T]( */ def close(): Unit = { if (!unrolledIteratorIsConsumed) { - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) unrolledIteratorIsConsumed = true } iter = null @@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream { * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]]. * @param redirectableOutputStream an OutputStream which can be redirected to a different sink. * @param unrollMemory the amount of unroll memory used by the values in `unrolled`. + * @param memoryMode whether the unroll memory is on- or off-heap * @param unrolled a byte buffer containing the partially-serialized values. * @param rest the rest of the original iterator passed to * [[MemoryStore.putIteratorAsValues()]]. @@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T]( serializationStream: SerializationStream, redirectableOutputStream: RedirectableOutputStream, unrollMemory: Long, + memoryMode: MemoryMode, unrolled: ChunkedByteBuffer, rest: Iterator[T], classTag: ClassTag[T]) { + // If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of + // this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task + // completion listener here in order to ensure that `unrolled.dispose()` is called at least once. + // The dispose() method is idempotent, so it's safe to call it unconditionally. + Option(TaskContext.get()).foreach { taskContext => + taskContext.addTaskCompletionListener { _ => + // When a task completes, its unroll memory will automatically be freed. Thus we do not call + // releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing. + unrolled.dispose() + } + } + /** * Called to dispose of this block and free its memory. */ def discard(): Unit = { try { + // We want to close the output stream in order to free any resources associated with the + // serializer itself (such as Kryo's internal buffers). close() might cause data to be + // written, so redirect the output stream to discard that data. + redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream()) serializationStream.close() } finally { - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + unrolled.dispose() + memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) } } @@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T]( * and then serializing the values from the original input iterator. */ def finishWritingToStream(os: OutputStream): Unit = { - ByteStreams.copy(unrolled.toInputStream(), os) + // `unrolled`'s underlying buffers will be freed once this input stream is fully read: + ByteStreams.copy(unrolled.toInputStream(dispose = true), os) + memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) redirectableOutputStream.setOutputStream(os) while (rest.hasNext) { serializationStream.writeObject(rest.next())(classTag) } - discard() + serializationStream.close() } /** @@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T]( * `close()` on it to free its resources. */ def valuesIterator: PartiallyUnrolledIterator[T] = { + // `unrolled`'s underlying buffers will be freed once this input stream is fully read: + val unrolledIter = serializerManager.dataDeserializeStream( + blockId, unrolled.toInputStream(dispose = true))(classTag) new PartiallyUnrolledIterator( memoryStore, unrollMemory, - unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag), + unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()), rest = rest) } } |