aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-05 19:02:18 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-05 19:02:18 -0800
commiteec74ba8bde7f9446cc38e687bda103e85669d35 (patch)
tree5e656d6333afde0255e96d930b245df28994bf9b /core/src/test/java
parent3cc2c053b5d68c747a30bd58cf388b87b1922f13 (diff)
downloadspark-eec74ba8bde7f9446cc38e687bda103e85669d35.tar.gz
spark-eec74ba8bde7f9446cc38e687bda103e85669d35.tar.bz2
spark-eec74ba8bde7f9446cc38e687bda103e85669d35.zip
[SPARK-7542][SQL] Support off-heap index/sort buffer
This brings the support of off-heap memory for array inside BytesToBytesMap and InMemorySorter, then we could allocate all the memory from off-heap for execution. Closes #8068 Author: Davies Liu <davies@databricks.com> Closes #9477 from davies/unsafe_timsort.
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java45
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java16
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java1
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java12
5 files changed, 64 insertions, 33 deletions
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index dab7b0592c..c731317395 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -17,8 +17,6 @@
package org.apache.spark.memory;
-import java.io.IOException;
-
import org.junit.Assert;
import org.junit.Test;
@@ -27,27 +25,6 @@ import org.apache.spark.unsafe.memory.MemoryBlock;
public class TaskMemoryManagerSuite {
- class TestMemoryConsumer extends MemoryConsumer {
- TestMemoryConsumer(TaskMemoryManager memoryManager) {
- super(memoryManager);
- }
-
- @Override
- public long spill(long size, MemoryConsumer trigger) throws IOException {
- long used = getUsed();
- releaseMemory(used);
- return used;
- }
-
- void use(long size) {
- acquireMemory(size);
- }
-
- void free(long size) {
- releaseMemory(size);
- }
- }
-
@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
new file mode 100644
index 0000000000..8ae3642738
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+import java.io.IOException;
+
+public class TestMemoryConsumer extends MemoryConsumer {
+ public TestMemoryConsumer(TaskMemoryManager memoryManager) {
+ super(memoryManager);
+ }
+
+ @Override
+ public long spill(long size, MemoryConsumer trigger) throws IOException {
+ long used = getUsed();
+ free(used);
+ return used;
+ }
+
+ void use(long size) {
+ long got = taskMemoryManager.acquireExecutionMemory(size, this);
+ used += got;
+ }
+
+ void free(long size) {
+ used -= size;
+ taskMemoryManager.releaseExecutionMemory(size, this);
+ }
+}
+
+
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 2293b1bbc1..faa5a863ee 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -25,13 +25,19 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
-import org.apache.spark.unsafe.Platform;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.memory.TaskMemoryManager;
public class ShuffleInMemorySorterSuite {
+ final TestMemoryManager memoryManager =
+ new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
+ final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager);
+
private static String getStringFromDataPage(Object baseObject, long baseOffset, int strLength) {
final byte[] strBytes = new byte[strLength];
Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, strLength);
@@ -40,7 +46,7 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(100);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 100);
final ShuffleInMemorySorter.ShuffleSorterIterator iter = sorter.getSortedIterator();
assert(!iter.hasNext());
}
@@ -63,7 +69,7 @@ public class ShuffleInMemorySorterSuite {
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(4);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Write the records into the data page and store pointers into the sorter
@@ -104,7 +110,7 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingManyNumbers() throws Exception {
- ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(4);
+ ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index cfead0e592..11c3a7be38 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -390,7 +390,6 @@ public class UnsafeExternalSorterSuite {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
insertNumber(sorter, i);
newPeakMemory = sorter.getPeakMemoryUsedBytes();
- // The first page is pre-allocated on instantiation
if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 642f6585f8..a203a09648 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -23,6 +23,7 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
+import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
@@ -44,9 +45,11 @@ public class UnsafeInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
- final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(
- new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0),
+ final TaskMemoryManager memoryManager = new TaskMemoryManager(
+ new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
+ final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer,
+ memoryManager,
mock(RecordComparator.class),
mock(PrefixComparator.class),
100);
@@ -69,6 +72,7 @@ public class UnsafeInMemorySorterSuite {
};
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
// Write the records into the data page:
@@ -102,7 +106,7 @@ public class UnsafeInMemorySorterSuite {
return (int) prefix1 - (int) prefix2;
}
};
- UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(memoryManager, recordComparator,
+ UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, recordComparator,
prefixComparator, dataToSort.length);
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();