aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-08-06 14:33:29 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 14:33:29 -0700
commit21fdfd7d6f89adbd37066c169e6ba9ccd337683e (patch)
tree6d32717cad457f0037ee332dc51a807834d59f76 /core/src/main/java
parentabfedb9cd70af60c8290bd2f5a5cec1047845ba0 (diff)
downloadspark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.tar.gz
spark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.tar.bz2
spark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.zip
[SPARK-9548][SQL] Add a destructive iterator for BytesToBytesMap
This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator frees pages as it traverses them. This is part of the effort to avoid starving when we have more than one operators that can exhaust memory. This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter). Closes #7924. Author: Liang-Chi Hsieh <viirya@appier.com> Author: Reynold Xin <rxin@databricks.com> Closes #8003 from rxin/map-destructive-iterator and squashes the following commits: 6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter. a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator 7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove code block. 4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter 581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter f0ff783 [Liang-Chi Hsieh] No need to free last page. 9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.
Diffstat (limited to 'core/src/main/java')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java33
1 files changed, 30 insertions, 3 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 20347433e1..5ac3736ac6 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -227,22 +227,35 @@ public final class BytesToBytesMap {
private final Iterator<MemoryBlock> dataPagesIterator;
private final Location loc;
- private MemoryBlock currentPage;
+ private MemoryBlock currentPage = null;
private int currentRecordNumber = 0;
private Object pageBaseObject;
private long offsetInPage;
+ // If this iterator destructive or not. When it is true, it frees each page as it moves onto
+ // next one.
+ private boolean destructive = false;
+ private BytesToBytesMap bmap;
+
private BytesToBytesMapIterator(
- int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
+ int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc,
+ boolean destructive, BytesToBytesMap bmap) {
this.numRecords = numRecords;
this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
+ this.destructive = destructive;
+ this.bmap = bmap;
if (dataPagesIterator.hasNext()) {
advanceToNextPage();
}
}
private void advanceToNextPage() {
+ if (destructive && currentPage != null) {
+ dataPagesIterator.remove();
+ this.bmap.taskMemoryManager.freePage(currentPage);
+ this.bmap.shuffleMemoryManager.release(currentPage.size());
+ }
currentPage = dataPagesIterator.next();
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
@@ -281,7 +294,21 @@ public final class BytesToBytesMap {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public BytesToBytesMapIterator iterator() {
- return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
+ return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, false, this);
+ }
+
+ /**
+ * Returns a destructive iterator for iterating over the entries of this map. It frees each page
+ * as it moves onto next one. Notice: it is illegal to call any method on the map after
+ * `destructiveIterator()` has been called.
+ *
+ * For efficiency, all calls to `next()` will return the same {@link Location} object.
+ *
+ * If any other lookups or operations are performed on this map while iterating over it, including
+ * `lookup()`, the behavior of the returned iterator is undefined.
+ */
+ public BytesToBytesMapIterator destructiveIterator() {
+ return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, true, this);
}
/**