aboutsummaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-04-01 14:34:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-04-01 14:34:59 -0700
commite41acb757327e3226ffe312766ec759c16616588 (patch)
tree94d57d0c6c4d3177be7ecb541535c620de2c054c /common
parentbd7b91cefb0d192d808778e6182dcdd2c143e132 (diff)
downloadspark-e41acb757327e3226ffe312766ec759c16616588.tar.gz
spark-e41acb757327e3226ffe312766ec759c16616588.tar.bz2
spark-e41acb757327e3226ffe312766ec759c16616588.zip
[SPARK-13992] Add support for off-heap caching
This patch adds support for caching blocks in the executor processes using direct / off-heap memory. ## User-facing changes **Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication. **Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap. **Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction. **Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes. **Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables. ## Internal changes - Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream` - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays. - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers. - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory. - The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap. - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa). - Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction. - The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11805 from JoshRosen/off-heap-caching.
Diffstat (limited to 'common')
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java32
1 files changed, 32 insertions, 0 deletions
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 672552cc65..bdf52f32c6 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -17,9 +17,12 @@
package org.apache.spark.unsafe;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import sun.misc.Cleaner;
import sun.misc.Unsafe;
public final class Platform {
@@ -144,6 +147,35 @@ public final class Platform {
return newMemory;
}
+ /**
+ * Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
+ * MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
+ * to increase it).
+ */
+ @SuppressWarnings("unchecked")
+ public static ByteBuffer allocateDirectBuffer(int size) {
+ try {
+ Class cls = Class.forName("java.nio.DirectByteBuffer");
+ Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
+ constructor.setAccessible(true);
+ Field cleanerField = cls.getDeclaredField("cleaner");
+ cleanerField.setAccessible(true);
+ final long memory = allocateMemory(size);
+ ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
+ Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
+ @Override
+ public void run() {
+ freeMemory(memory);
+ }
+ });
+ cleanerField.set(buffer, cleaner);
+ return buffer;
+ } catch (Exception e) {
+ throwException(e);
+ }
+ throw new IllegalStateException("unreachable");
+ }
+
public static void setMemory(long address, byte value, long size) {
_UNSAFE.setMemory(address, size, value);
}