aboutsummaryrefslogtreecommitdiff
path: root/network
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-02-25 17:17:56 -0800
committerAndrew Or <andrew@databricks.com>2016-02-25 17:17:56 -0800
commit633d63a48ad98754dc7c56f9ac150fc2aa4e42c5 (patch)
tree5835d7ac7b492f8f39293b0a3b48ea85c15fca5e /network
parent712995757c22a0bd76e4ccb552446372acf2cc2e (diff)
downloadspark-633d63a48ad98754dc7c56f9ac150fc2aa4e42c5.tar.gz
spark-633d63a48ad98754dc7c56f9ac150fc2aa4e42c5.tar.bz2
spark-633d63a48ad98754dc7c56f9ac150fc2aa4e42c5.zip
[SPARK-12757] Add block-level read/write locks to BlockManager
## Motivation As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults. ## Changes ### BlockInfoManager and reader/writer locks This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes. `BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748). See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics. ### Auto-release of locks at the end of tasks Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task. To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks. ### Locking and the MemoryStore In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed. ### Locking and remote block transfer This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers. ## FAQ - **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?** Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue. - **Why not detect "leaked" locks in tests?**: See above notes about `take()` and `limit`. Author: Josh Rosen <joshrosen@databricks.com> Closes #10705 from JoshRosen/pin-pages.
Diffstat (limited to 'network')
-rw-r--r--network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java2
1 files changed, 1 insertions, 1 deletions
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
index f55b884bc4..631d767715 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java
@@ -28,7 +28,7 @@ import io.netty.buffer.Unpooled;
/**
* A {@link ManagedBuffer} backed by {@link ByteBuffer}.
*/
-public final class NioManagedBuffer extends ManagedBuffer {
+public class NioManagedBuffer extends ManagedBuffer {
private final ByteBuffer buf;
public NioManagedBuffer(ByteBuffer buf) {