diff options
Diffstat (limited to 'core')
3 files changed, 44 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8b6c96a4c4..7dc0508784 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private long pageCursor = -1; private long peakMemoryUsedBytes = 0; private long totalSpillBytes = 0L; + private long totalSortTimeNanos = 0L; private volatile SpillableIterator readingIterator = null; public static UnsafeExternalSorter createWithExistingInMemorySorter( @@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { + UnsafeInMemorySorter sorter = inMemSorter; + if (sorter != null) { + return sorter.getSortTimeNanos(); + } + return totalSortTimeNanos; + } + + /** * Return the total number of bytes that has been spilled into disk so far. */ public long getSpillSize() { @@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); + totalSortTimeNanos += inMemSorter.getSortTimeNanos(); inMemSorter.free(); inMemSorter = null; taskContext.taskMetrics().incMemoryBytesSpilled(released); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 03973f3c12..0cce792f33 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter { private long initialSize; + private long totalSortTimeNanos = 0L; + public UnsafeInMemorySorter( final MemoryConsumer consumer, final TaskMemoryManager memoryManager, @@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter { return pos / 2; } + /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { + return totalSortTimeNanos; + } + public long getMemoryUsage() { return array.size() * 8; } @@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter { */ public SortedIterator getSortedIterator() { int offset = 0; + long start = System.nanoTime(); if (sorter != null) { if (this.radixSortSupport != null) { // TODO(ekl) we should handle NULL values before radix sort for efficiency, since they @@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter { sorter.sort(array, 0, pos / 2, sortComparator); } } + totalSortTimeNanos += System.nanoTime() - start; return new SortedIterator(pos / 2, offset); } } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 60a40cc172..2cae4beb4c 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -49,6 +49,7 @@ import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; import static org.mockito.Answers.RETURNS_SMART_NULLS; @@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite { } @Test + public void testSortTimeMetric() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + long prevSortTime = sorter.getSortTimeNanos(); + assertEquals(prevSortTime, 0); + + sorter.insertRecord(null, 0, 0, 0); + sorter.spill(); + assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); + prevSortTime = sorter.getSortTimeNanos(); + + sorter.spill(); // no sort needed + assertEquals(sorter.getSortTimeNanos(), prevSortTime); + + sorter.insertRecord(null, 0, 0, 0); + UnsafeSorterIterator iter = sorter.getSortedIterator(); + assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); + } + + @Test public void spillingOccursInResponseToMemoryPressure() throws Exception { final UnsafeExternalSorter sorter = newSorter(); // This should be enough records to completely fill up a data page: |