aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-22 12:59:32 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-22 12:59:32 -0700
commit0dcf9dbebbd53aaebe17c85ede7ab7847ce83137 (patch)
tree8dd6b2fa65e308bb10294f8ea76cbd0de1ace382 /core
parent0419d63169274ecd60c05c1ef4ce2d4ed3a49605 (diff)
downloadspark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.tar.gz
spark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.tar.bz2
spark-0dcf9dbebbd53aaebe17c85ede7ab7847ce83137.zip
[SPARK-14669] [SQL] Fix some SQL metrics in codegen and added more
## What changes were proposed in this pull request? 1. Fix the "spill size" of TungstenAggregate and Sort 2. Rename "data size" to "peak memory" to match the actual meaning (also consistent with task metrics) 3. Added "data size" for ShuffleExchange and BroadcastExchange 4. Added some timing for Sort, Aggregate and BroadcastExchange (this requires another patch to work) ## How was this patch tested? Existing tests. ![metrics](https://cloud.githubusercontent.com/assets/40902/14573908/21ad2f00-030d-11e6-9e2c-c544f30039ea.png) Author: Davies Liu <davies@databricks.com> Closes #12425 from davies/fix_metrics.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java12
1 files changed, 11 insertions, 1 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 66a77982ad..3c1cd39dc2 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -75,6 +75,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private MemoryBlock currentPage = null;
private long pageCursor = -1;
private long peakMemoryUsedBytes = 0;
+ private long totalSpillBytes = 0L;
private volatile SpillableIterator readingIterator = null;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
@@ -215,7 +216,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
-
+ totalSpillBytes += spillSize;
return spillSize;
}
@@ -246,6 +247,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return peakMemoryUsedBytes;
}
+ /**
+ * Return the total number of bytes that has been spilled into disk so far.
+ */
+ public long getSpillSize() {
+ return totalSpillBytes;
+ }
+
@VisibleForTesting
public int getNumberOfAllocatedPages() {
return allocatedPages.size();
@@ -499,6 +507,8 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
released += inMemSorter.getMemoryUsage();
inMemSorter.free();
inMemSorter = null;
+ taskContext.taskMetrics().incMemoryBytesSpilled(released);
+ totalSpillBytes += released;
return released;
}
}