aboutsummaryrefslogtreecommitdiff
path: root/unsafe
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-10-29 23:38:06 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-29 23:38:06 -0700
commit56419cf11f769c80f391b45dc41b3c7101cc5ff4 (patch)
treec40211de4baa6c9ab9d12160ac3bab977fb17db4 /unsafe
parentd89be0bf81029cd82008a959d191e1c7b6ceaa18 (diff)
downloadspark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.gz
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.tar.bz2
spark-56419cf11f769c80f391b45dc41b3c7101cc5ff4.zip
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed. Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling). The PrepareRDD may be not needed anymore, could be removed in follow up PR. The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration). ```python sqlContext.setConf("spark.sql.shuffle.partitions", "1") df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s") df2 = df.select(df.id.alias('id2'), df.s.alias('s2')) j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2") j.explain() print j.count() ``` For thread-safety, here what I'm got: 1) Without calling spill(), the operators should only be used by single thread, no safety problems. 2) spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems. 3) if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it. 4) During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning. 5) In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter). Author: Davies Liu <davies@databricks.com> Closes #9241 from davies/force_spill.
Diffstat (limited to 'unsafe')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java9
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java3
2 files changed, 4 insertions, 8 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
index ebe90d9e63..09847cec9c 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import org.apache.spark.unsafe.Platform;
+
/**
* A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
*/
@@ -45,9 +47,6 @@ public class HeapMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
- if (size % 8 != 0) {
- throw new IllegalArgumentException("Size " + size + " was not a multiple of 8");
- }
if (shouldPool(size)) {
synchronized (this) {
final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
@@ -64,8 +63,8 @@ public class HeapMemoryAllocator implements MemoryAllocator {
}
}
}
- long[] array = new long[(int) (size / 8)];
- return MemoryBlock.fromLongArray(array);
+ long[] array = new long[(int) ((size + 7) / 8)];
+ return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
}
@Override
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
index cda7826c8c..98ce711176 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/UnsafeMemoryAllocator.java
@@ -26,9 +26,6 @@ public class UnsafeMemoryAllocator implements MemoryAllocator {
@Override
public MemoryBlock allocate(long size) throws OutOfMemoryError {
- if (size % 8 != 0) {
- throw new IllegalArgumentException("Size " + size + " was not a multiple of 8");
- }
long address = Platform.allocateMemory(size);
return new MemoryBlock(null, address, size);
}