aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java13
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java11
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java20
3 files changed, 44 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8b6c96a4c4..7dc0508784 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
+ private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ UnsafeInMemorySorter sorter = inMemSorter;
+ if (sorter != null) {
+ return sorter.getSortTimeNanos();
+ }
+ return totalSortTimeNanos;
+ }
+
+ /**
* Return the total number of bytes that has been spilled into disk so far.
*/
public long getSpillSize() {
@@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
+ totalSortTimeNanos += inMemSorter.getSortTimeNanos();
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 03973f3c12..0cce792f33 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter {
private long initialSize;
+ private long totalSortTimeNanos = 0L;
+
public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
@@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter {
return pos / 2;
}
+ /**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ return totalSortTimeNanos;
+ }
+
public long getMemoryUsage() {
return array.size() * 8;
}
@@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter {
*/
public SortedIterator getSortedIterator() {
int offset = 0;
+ long start = System.nanoTime();
if (sorter != null) {
if (this.radixSortSupport != null) {
// TODO(ekl) we should handle NULL values before radix sort for efficiency, since they
@@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
}
}
+ totalSortTimeNanos += System.nanoTime() - start;
return new SortedIterator(pos / 2, offset);
}
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 60a40cc172..2cae4beb4c 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -49,6 +49,7 @@ import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void testSortTimeMetric() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long prevSortTime = sorter.getSortTimeNanos();
+ assertEquals(prevSortTime, 0);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ sorter.spill();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ prevSortTime = sorter.getSortTimeNanos();
+
+ sorter.spill(); // no sort needed
+ assertEquals(sorter.getSortTimeNanos(), prevSortTime);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ }
+
+ @Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
// This should be enough records to completely fill up a data page: