diff options
5 files changed, 53 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8b6c96a4c4..7dc0508784 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private long pageCursor = -1; private long peakMemoryUsedBytes = 0; private long totalSpillBytes = 0L; + private long totalSortTimeNanos = 0L; private volatile SpillableIterator readingIterator = null; public static UnsafeExternalSorter createWithExistingInMemorySorter( @@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer { } /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { + UnsafeInMemorySorter sorter = inMemSorter; + if (sorter != null) { + return sorter.getSortTimeNanos(); + } + return totalSortTimeNanos; + } + + /** * Return the total number of bytes that has been spilled into disk so far. */ public long getSpillSize() { @@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // in-memory sorter will not be used after spilling assert(inMemSorter != null); released += inMemSorter.getMemoryUsage(); + totalSortTimeNanos += inMemSorter.getSortTimeNanos(); inMemSorter.free(); inMemSorter = null; taskContext.taskMetrics().incMemoryBytesSpilled(released); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 03973f3c12..0cce792f33 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter { private long initialSize; + private long totalSortTimeNanos = 0L; + public UnsafeInMemorySorter( final MemoryConsumer consumer, final TaskMemoryManager memoryManager, @@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter { return pos / 2; } + /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { + return totalSortTimeNanos; + } + public long getMemoryUsage() { return array.size() * 8; } @@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter { */ public SortedIterator getSortedIterator() { int offset = 0; + long start = System.nanoTime(); if (sorter != null) { if (this.radixSortSupport != null) { // TODO(ekl) we should handle NULL values before radix sort for efficiency, since they @@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter { sorter.sort(array, 0, pos / 2, sortComparator); } } + totalSortTimeNanos += System.nanoTime() - start; return new SortedIterator(pos / 2, offset); } } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 60a40cc172..2cae4beb4c 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -49,6 +49,7 @@ import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; import org.apache.spark.util.Utils; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.*; import static org.mockito.Answers.RETURNS_SMART_NULLS; @@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite { } @Test + public void testSortTimeMetric() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + long prevSortTime = sorter.getSortTimeNanos(); + assertEquals(prevSortTime, 0); + + sorter.insertRecord(null, 0, 0, 0); + sorter.spill(); + assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); + prevSortTime = sorter.getSortTimeNanos(); + + sorter.spill(); // no sort needed + assertEquals(sorter.getSortTimeNanos(), prevSortTime); + + sorter.insertRecord(null, 0, 0, 0); + UnsafeSorterIterator iter = sorter.getSortedIterator(); + assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); + } + + @Test public void spillingOccursInResponseToMemoryPressure() throws Exception { final UnsafeExternalSorter sorter = newSorter(); // This should be enough records to completely fill up a data page: diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 8d9906da7e..37fbad47c1 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -108,6 +108,13 @@ public final class UnsafeExternalRowSorter { return sorter.getPeakMemoryUsedBytes(); } + /** + * @return the total amount of time spent sorting data (in-memory only). + */ + public long getSortTimeNanos() { + return sorter.getSortTimeNanos(); + } + private void cleanupResources() { sorter.cleanupResources(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 0e4d6d72c6..66a16ac576 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -97,11 +97,8 @@ case class SortExec( // Remember spill data size of this task before execute this operator so that we can // figure out how many bytes we spilled for this operator. val spillSizeBefore = metrics.memoryBytesSpilled - val beforeSort = System.nanoTime() - val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) - - sortTime += (System.nanoTime() - beforeSort) / 1000000 + sortTime += sorter.getSortTimeNanos / 1000000 peakMemory += sorter.getPeakMemoryUsage spillSize += metrics.memoryBytesSpilled - spillSizeBefore metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage) @@ -151,15 +148,13 @@ case class SortExec( val peakMemory = metricTerm(ctx, "peakMemory") val spillSize = metricTerm(ctx, "spillSize") val spillSizeBefore = ctx.freshName("spillSizeBefore") - val startTime = ctx.freshName("startTime") val sortTime = metricTerm(ctx, "sortTime") s""" | if ($needToSort) { | long $spillSizeBefore = $metrics.memoryBytesSpilled(); - | long $startTime = System.nanoTime(); | $addToSorter(); | $sortedIterator = $sorterVariable.sort(); - | $sortTime.add((System.nanoTime() - $startTime) / 1000000); + | $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000); | $peakMemory.add($sorterVariable.getPeakMemoryUsage()); | $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore); | $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage()); |