aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java13
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java11
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java20
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala9
5 files changed, 53 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8b6c96a4c4..7dc0508784 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
+ private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ UnsafeInMemorySorter sorter = inMemSorter;
+ if (sorter != null) {
+ return sorter.getSortTimeNanos();
+ }
+ return totalSortTimeNanos;
+ }
+
+ /**
* Return the total number of bytes that has been spilled into disk so far.
*/
public long getSpillSize() {
@@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
+ totalSortTimeNanos += inMemSorter.getSortTimeNanos();
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 03973f3c12..0cce792f33 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter {
private long initialSize;
+ private long totalSortTimeNanos = 0L;
+
public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
@@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter {
return pos / 2;
}
+ /**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ return totalSortTimeNanos;
+ }
+
public long getMemoryUsage() {
return array.size() * 8;
}
@@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter {
*/
public SortedIterator getSortedIterator() {
int offset = 0;
+ long start = System.nanoTime();
if (sorter != null) {
if (this.radixSortSupport != null) {
// TODO(ekl) we should handle NULL values before radix sort for efficiency, since they
@@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
}
}
+ totalSortTimeNanos += System.nanoTime() - start;
return new SortedIterator(pos / 2, offset);
}
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 60a40cc172..2cae4beb4c 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -49,6 +49,7 @@ import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void testSortTimeMetric() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long prevSortTime = sorter.getSortTimeNanos();
+ assertEquals(prevSortTime, 0);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ sorter.spill();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ prevSortTime = sorter.getSortTimeNanos();
+
+ sorter.spill(); // no sort needed
+ assertEquals(sorter.getSortTimeNanos(), prevSortTime);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ }
+
+ @Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
// This should be enough records to completely fill up a data page:
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 8d9906da7e..37fbad47c1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -108,6 +108,13 @@ public final class UnsafeExternalRowSorter {
return sorter.getPeakMemoryUsedBytes();
}
+ /**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ return sorter.getSortTimeNanos();
+ }
+
private void cleanupResources() {
sorter.cleanupResources();
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 0e4d6d72c6..66a16ac576 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -97,11 +97,8 @@ case class SortExec(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
- val beforeSort = System.nanoTime()
-
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-
- sortTime += (System.nanoTime() - beforeSort) / 1000000
+ sortTime += sorter.getSortTimeNanos / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -151,15 +148,13 @@ case class SortExec(
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
- val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
- | long $startTime = System.nanoTime();
| $addToSorter();
| $sortedIterator = $sorterVariable.sort();
- | $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+ | $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());