aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-08-06 14:33:29 -0700
committerReynold Xin <rxin@databricks.com>2015-08-06 14:33:29 -0700
commit21fdfd7d6f89adbd37066c169e6ba9ccd337683e (patch)
tree6d32717cad457f0037ee332dc51a807834d59f76
parentabfedb9cd70af60c8290bd2f5a5cec1047845ba0 (diff)
downloadspark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.tar.gz
spark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.tar.bz2
spark-21fdfd7d6f89adbd37066c169e6ba9ccd337683e.zip
[SPARK-9548][SQL] Add a destructive iterator for BytesToBytesMap
This pull request adds a destructive iterator to BytesToBytesMap. When used, the iterator frees pages as it traverses them. This is part of the effort to avoid starving when we have more than one operators that can exhaust memory. This is based on #7924, but fixes a bug there (Don't use destructive iterator in UnsafeKVExternalSorter). Closes #7924. Author: Liang-Chi Hsieh <viirya@appier.com> Author: Reynold Xin <rxin@databricks.com> Closes #8003 from rxin/map-destructive-iterator and squashes the following commits: 6b618c3 [Reynold Xin] Don't use destructive iterator in UnsafeKVExternalSorter. a7bd8ec [Reynold Xin] Merge remote-tracking branch 'viirya/destructive_iter' into map-destructive-iterator 7652083 [Liang-Chi Hsieh] For comments: add destructiveIterator(), modify unit test, remove code block. 4a3e9de [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter 581e9e3 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into destructive_iter f0ff783 [Liang-Chi Hsieh] No need to free last page. 9e9d2a3 [Liang-Chi Hsieh] Add a destructive iterator for BytesToBytesMap.
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java33
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java37
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java7
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java5
4 files changed, 71 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 20347433e1..5ac3736ac6 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -227,22 +227,35 @@ public final class BytesToBytesMap {
private final Iterator<MemoryBlock> dataPagesIterator;
private final Location loc;
- private MemoryBlock currentPage;
+ private MemoryBlock currentPage = null;
private int currentRecordNumber = 0;
private Object pageBaseObject;
private long offsetInPage;
+ // If this iterator destructive or not. When it is true, it frees each page as it moves onto
+ // next one.
+ private boolean destructive = false;
+ private BytesToBytesMap bmap;
+
private BytesToBytesMapIterator(
- int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc) {
+ int numRecords, Iterator<MemoryBlock> dataPagesIterator, Location loc,
+ boolean destructive, BytesToBytesMap bmap) {
this.numRecords = numRecords;
this.dataPagesIterator = dataPagesIterator;
this.loc = loc;
+ this.destructive = destructive;
+ this.bmap = bmap;
if (dataPagesIterator.hasNext()) {
advanceToNextPage();
}
}
private void advanceToNextPage() {
+ if (destructive && currentPage != null) {
+ dataPagesIterator.remove();
+ this.bmap.taskMemoryManager.freePage(currentPage);
+ this.bmap.shuffleMemoryManager.release(currentPage.size());
+ }
currentPage = dataPagesIterator.next();
pageBaseObject = currentPage.getBaseObject();
offsetInPage = currentPage.getBaseOffset();
@@ -281,7 +294,21 @@ public final class BytesToBytesMap {
* `lookup()`, the behavior of the returned iterator is undefined.
*/
public BytesToBytesMapIterator iterator() {
- return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc);
+ return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, false, this);
+ }
+
+ /**
+ * Returns a destructive iterator for iterating over the entries of this map. It frees each page
+ * as it moves onto next one. Notice: it is illegal to call any method on the map after
+ * `destructiveIterator()` has been called.
+ *
+ * For efficiency, all calls to `next()` will return the same {@link Location} object.
+ *
+ * If any other lookups or operations are performed on this map while iterating over it, including
+ * `lookup()`, the behavior of the returned iterator is undefined.
+ */
+ public BytesToBytesMapIterator destructiveIterator() {
+ return new BytesToBytesMapIterator(numElements, dataPages.iterator(), loc, true, this);
}
/**
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 0e23a64fb7..3c50033801 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -183,8 +183,7 @@ public abstract class AbstractBytesToBytesMapSuite {
}
}
- @Test
- public void iteratorTest() throws Exception {
+ private void iteratorTestBase(boolean destructive) throws Exception {
final int size = 4096;
BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, shuffleMemoryManager, size / 2, PAGE_SIZE_BYTES);
@@ -216,7 +215,14 @@ public abstract class AbstractBytesToBytesMapSuite {
}
}
final java.util.BitSet valuesSeen = new java.util.BitSet(size);
- final Iterator<BytesToBytesMap.Location> iter = map.iterator();
+ final Iterator<BytesToBytesMap.Location> iter;
+ if (destructive) {
+ iter = map.destructiveIterator();
+ } else {
+ iter = map.iterator();
+ }
+ int numPages = map.getNumDataPages();
+ int countFreedPages = 0;
while (iter.hasNext()) {
final BytesToBytesMap.Location loc = iter.next();
Assert.assertTrue(loc.isDefined());
@@ -228,11 +234,22 @@ public abstract class AbstractBytesToBytesMapSuite {
if (keyLength == 0) {
Assert.assertTrue("value " + value + " was not divisible by 5", value % 5 == 0);
} else {
- final long key = PlatformDependent.UNSAFE.getLong(
- keyAddress.getBaseObject(), keyAddress.getBaseOffset());
+ final long key = PlatformDependent.UNSAFE.getLong(
+ keyAddress.getBaseObject(), keyAddress.getBaseOffset());
Assert.assertEquals(value, key);
}
valuesSeen.set((int) value);
+ if (destructive) {
+ // The iterator moves onto next page and frees previous page
+ if (map.getNumDataPages() < numPages) {
+ numPages = map.getNumDataPages();
+ countFreedPages++;
+ }
+ }
+ }
+ if (destructive) {
+ // Latest page is not freed by iterator but by map itself
+ Assert.assertEquals(countFreedPages, numPages - 1);
}
Assert.assertEquals(size, valuesSeen.cardinality());
} finally {
@@ -241,6 +258,16 @@ public abstract class AbstractBytesToBytesMapSuite {
}
@Test
+ public void iteratorTest() throws Exception {
+ iteratorTestBase(false);
+ }
+
+ @Test
+ public void destructiveIteratorTest() throws Exception {
+ iteratorTestBase(true);
+ }
+
+ @Test
public void iteratingOverDataPagesWithWastedSpace() throws Exception {
final int NUM_ENTRIES = 1000 * 1000;
final int KEY_LENGTH = 24;
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 02458030b0..efb33530da 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -154,14 +154,17 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
- * Returns an iterator over the keys and values in this map.
+ * Returns an iterator over the keys and values in this map. This uses destructive iterator of
+ * BytesToBytesMap. So it is illegal to call any other method on this map after `iterator()` has
+ * been called.
*
* For efficiency, each call returns the same object.
*/
public KVIterator<UnsafeRow, UnsafeRow> iterator() {
return new KVIterator<UnsafeRow, UnsafeRow>() {
- private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator = map.iterator();
+ private final BytesToBytesMap.BytesToBytesMapIterator mapLocationIterator =
+ map.destructiveIterator();
private final UnsafeRow key = new UnsafeRow();
private final UnsafeRow value = new UnsafeRow();
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 6c1cf136d9..9a65c9d3a4 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -88,8 +88,11 @@ public final class UnsafeKVExternalSorter {
final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter(
taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements()));
- final int numKeyFields = keySchema.size();
+ // We cannot use the destructive iterator here because we are reusing the existing memory
+ // pages in BytesToBytesMap to hold records during sorting.
+ // The only new memory we are allocating is the pointer/prefix array.
BytesToBytesMap.BytesToBytesMapIterator iter = map.iterator();
+ final int numKeyFields = keySchema.size();
UnsafeRow row = new UnsafeRow();
while (iter.hasNext()) {
final BytesToBytesMap.Location loc = iter.next();