aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-05-11 11:25:46 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-11 11:25:46 -0700
commit6d0368ab8d1043735e5fe89f801aae1c6826876c (patch)
treeeba9c39405dc0d7bd9a4f2cf10af7e494d6a6742
parent29314379729de4082bd2297c9e5289e3e4a0115e (diff)
downloadspark-6d0368ab8d1043735e5fe89f801aae1c6826876c.tar.gz
spark-6d0368ab8d1043735e5fe89f801aae1c6826876c.tar.bz2
spark-6d0368ab8d1043735e5fe89f801aae1c6826876c.zip
[SPARK-15259] Sort time metric should not include spill and record insertion time
## What changes were proposed in this pull request? After SPARK-14669 it seems the sort time metric includes both spill and record insertion time. This makes it not very useful since the metric becomes close to the total execution time of the node. We should track just the time spent for in-memory sort, as before. ## How was this patch tested? Verified metric in the UI, also unit test on UnsafeExternalRowSorter. cc davies Author: Eric Liang <ekl@databricks.com> Author: Eric Liang <ekhliang@gmail.com> Closes #13035 from ericl/fix-metrics.
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java13
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java11
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java20
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala9
5 files changed, 53 insertions, 7 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8b6c96a4c4..7dc0508784 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -76,6 +76,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
private long totalSpillBytes = 0L;
+ private long totalSortTimeNanos = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -248,6 +249,17 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ UnsafeInMemorySorter sorter = inMemSorter;
+ if (sorter != null) {
+ return sorter.getSortTimeNanos();
+ }
+ return totalSortTimeNanos;
+ }
+
+ /**
* Return the total number of bytes that has been spilled into disk so far.
*/
public long getSpillSize() {
@@ -505,6 +517,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// in-memory sorter will not be used after spilling
assert(inMemSorter != null);
released += inMemSorter.getMemoryUsage();
+ totalSortTimeNanos += inMemSorter.getSortTimeNanos();
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 03973f3c12..0cce792f33 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -97,6 +97,8 @@ public final class UnsafeInMemorySorter {
private long initialSize;
+ private long totalSortTimeNanos = 0L;
+
public UnsafeInMemorySorter(
final MemoryConsumer consumer,
final TaskMemoryManager memoryManager,
@@ -160,6 +162,13 @@ public final class UnsafeInMemorySorter {
return pos / 2;
}
+ /**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ return totalSortTimeNanos;
+ }
+
public long getMemoryUsage() {
return array.size() * 8;
}
@@ -265,6 +274,7 @@ public final class UnsafeInMemorySorter {
*/
public SortedIterator getSortedIterator() {
int offset = 0;
+ long start = System.nanoTime();
if (sorter != null) {
if (this.radixSortSupport != null) {
// TODO(ekl) we should handle NULL values before radix sort for efficiency, since they
@@ -275,6 +285,7 @@ public final class UnsafeInMemorySorter {
sorter.sort(array, 0, pos / 2, sortComparator);
}
}
+ totalSortTimeNanos += System.nanoTime() - start;
return new SortedIterator(pos / 2, offset);
}
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 60a40cc172..2cae4beb4c 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -49,6 +49,7 @@ import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -226,6 +227,25 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void testSortTimeMetric() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long prevSortTime = sorter.getSortTimeNanos();
+ assertEquals(prevSortTime, 0);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ sorter.spill();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ prevSortTime = sorter.getSortTimeNanos();
+
+ sorter.spill(); // no sort needed
+ assertEquals(sorter.getSortTimeNanos(), prevSortTime);
+
+ sorter.insertRecord(null, 0, 0, 0);
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
+ assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime));
+ }
+
+ @Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
final UnsafeExternalSorter sorter = newSorter();
// This should be enough records to completely fill up a data page:
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 8d9906da7e..37fbad47c1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -108,6 +108,13 @@ public final class UnsafeExternalRowSorter {
return sorter.getPeakMemoryUsedBytes();
}
+ /**
+ * @return the total amount of time spent sorting data (in-memory only).
+ */
+ public long getSortTimeNanos() {
+ return sorter.getSortTimeNanos();
+ }
+
private void cleanupResources() {
sorter.cleanupResources();
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 0e4d6d72c6..66a16ac576 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -97,11 +97,8 @@ case class SortExec(
// Remember spill data size of this task before execute this operator so that we can
// figure out how many bytes we spilled for this operator.
val spillSizeBefore = metrics.memoryBytesSpilled
- val beforeSort = System.nanoTime()
-
val sortedIterator = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]])
-
- sortTime += (System.nanoTime() - beforeSort) / 1000000
+ sortTime += sorter.getSortTimeNanos / 1000000
peakMemory += sorter.getPeakMemoryUsage
spillSize += metrics.memoryBytesSpilled - spillSizeBefore
metrics.incPeakExecutionMemory(sorter.getPeakMemoryUsage)
@@ -151,15 +148,13 @@ case class SortExec(
val peakMemory = metricTerm(ctx, "peakMemory")
val spillSize = metricTerm(ctx, "spillSize")
val spillSizeBefore = ctx.freshName("spillSizeBefore")
- val startTime = ctx.freshName("startTime")
val sortTime = metricTerm(ctx, "sortTime")
s"""
| if ($needToSort) {
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
- | long $startTime = System.nanoTime();
| $addToSorter();
| $sortedIterator = $sorterVariable.sort();
- | $sortTime.add((System.nanoTime() - $startTime) / 1000000);
+ | $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
| $spillSize.add($metrics.memoryBytesSpilled() - $spillSizeBefore);
| $metrics.incPeakExecutionMemory($sorterVariable.getPeakMemoryUsage());