aboutsummaryrefslogtreecommitdiff
path: root/unsafe
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-08-14 12:32:35 -0700
committerDavies Liu <davies.liu@gmail.com>2015-08-14 12:32:35 -0700
commit3bc55287220b1248e935bf817d880ff176ad4d3b (patch)
tree1a1d25adf68e2d844cf073d881d0faab471949e7 /unsafe
parent57c2d08800a506614b461b5a505a1dd1a28a8908 (diff)
downloadspark-3bc55287220b1248e935bf817d880ff176ad4d3b.tar.gz
spark-3bc55287220b1248e935bf817d880ff176ad4d3b.tar.bz2
spark-3bc55287220b1248e935bf817d880ff176ad4d3b.zip
[SPARK-9946] [SPARK-9589] [SQL] fix NPE and thread-safety in TaskMemoryManager
Currently, we access the `page.pageNumer` after it's freed, that could be modified by other thread, cause NPE. The same TaskMemoryManager could be used by multiple threads (for example, Python UDF and TransportScript), so it should be thread safe to allocate/free memory/page. The underlying Bitset and HashSet are not thread safe, we should put them inside a synchronized block. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #8177 from davies/memory_manager.
Diffstat (limited to 'unsafe')
-rw-r--r--unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java44
1 files changed, 28 insertions, 16 deletions
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
index 358bb37250..ca70d7f4a4 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java
@@ -144,14 +144,16 @@ public class TaskMemoryManager {
public void freePage(MemoryBlock page) {
assert (page.pageNumber != -1) :
"Called freePage() on memory that wasn't allocated with allocatePage()";
- executorMemoryManager.free(page);
+ assert(allocatedPages.get(page.pageNumber));
+ pageTable[page.pageNumber] = null;
synchronized (this) {
allocatedPages.clear(page.pageNumber);
}
- pageTable[page.pageNumber] = null;
if (logger.isTraceEnabled()) {
logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
}
+ // Cannot access a page once it's freed.
+ executorMemoryManager.free(page);
}
/**
@@ -166,7 +168,9 @@ public class TaskMemoryManager {
public MemoryBlock allocate(long size) throws OutOfMemoryError {
assert(size > 0) : "Size must be positive, but got " + size;
final MemoryBlock memory = executorMemoryManager.allocate(size);
- allocatedNonPageMemory.add(memory);
+ synchronized(allocatedNonPageMemory) {
+ allocatedNonPageMemory.add(memory);
+ }
return memory;
}
@@ -176,8 +180,10 @@ public class TaskMemoryManager {
public void free(MemoryBlock memory) {
assert (memory.pageNumber == -1) : "Should call freePage() for pages, not free()";
executorMemoryManager.free(memory);
- final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
- assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
+ synchronized(allocatedNonPageMemory) {
+ final boolean wasAlreadyRemoved = !allocatedNonPageMemory.remove(memory);
+ assert (!wasAlreadyRemoved) : "Called free() on memory that was already freed!";
+ }
}
/**
@@ -223,9 +229,10 @@ public class TaskMemoryManager {
if (inHeap) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
- final Object page = pageTable[pageNumber].getBaseObject();
+ final MemoryBlock page = pageTable[pageNumber];
assert (page != null);
- return page;
+ assert (page.getBaseObject() != null);
+ return page.getBaseObject();
} else {
return null;
}
@@ -244,7 +251,9 @@ public class TaskMemoryManager {
// converted the absolute address into a relative address. Here, we invert that operation:
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
- return pageTable[pageNumber].getBaseOffset() + offsetInPage;
+ final MemoryBlock page = pageTable[pageNumber];
+ assert (page != null);
+ return page.getBaseOffset() + offsetInPage;
}
}
@@ -260,14 +269,17 @@ public class TaskMemoryManager {
freePage(page);
}
}
- final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
- while (iter.hasNext()) {
- final MemoryBlock memory = iter.next();
- freedBytes += memory.size();
- // We don't call free() here because that calls Set.remove, which would lead to a
- // ConcurrentModificationException here.
- executorMemoryManager.free(memory);
- iter.remove();
+
+ synchronized (allocatedNonPageMemory) {
+ final Iterator<MemoryBlock> iter = allocatedNonPageMemory.iterator();
+ while (iter.hasNext()) {
+ final MemoryBlock memory = iter.next();
+ freedBytes += memory.size();
+ // We don't call free() here because that calls Set.remove, which would lead to a
+ // ConcurrentModificationException here.
+ executorMemoryManager.free(memory);
+ iter.remove();
+ }
}
return freedBytes;
}