aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java27
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java38
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java8
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java29
-rw-r--r--core/src/main/resources/org/apache/spark/ui/static/webui.css2
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/Aggregator.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContext.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/TaskContextImpl.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Task.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/ToolTips.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala140
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala20
22 files changed, 383 insertions, 111 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index 1aa6ba4201..bf4eaa59ff 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -20,6 +20,7 @@ package org.apache.spark.shuffle.unsafe;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
+import javax.annotation.Nullable;
import scala.Tuple2;
@@ -86,9 +87,12 @@ final class UnsafeShuffleExternalSorter {
private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
+ /** Peak memory used by this sorter so far, in bytes. **/
+ private long peakMemoryUsedBytes;
+
// These variables are reset after spilling:
- private UnsafeShuffleInMemorySorter sorter;
- private MemoryBlock currentPage = null;
+ @Nullable private UnsafeShuffleInMemorySorter sorter;
+ @Nullable private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
@@ -106,6 +110,7 @@ final class UnsafeShuffleExternalSorter {
this.blockManager = blockManager;
this.taskContext = taskContext;
this.initialSize = initialSize;
+ this.peakMemoryUsedBytes = initialSize;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
@@ -279,10 +284,26 @@ final class UnsafeShuffleExternalSorter {
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
- return sorter.getMemoryUsage() + totalPageSize;
+ return ((sorter == null) ? 0 : sorter.getMemoryUsage()) + totalPageSize;
+ }
+
+ private void updatePeakMemoryUsed() {
+ long mem = getMemoryUsage();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
}
private long freeMemory() {
+ updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
memoryManager.freePage(block);
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index d47d6fc9c2..6e2eeb37c8 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -27,6 +27,7 @@ import scala.Product2;
import scala.collection.JavaConversions;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
+import scala.collection.immutable.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
@@ -78,8 +79,9 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;
- private MapStatus mapStatus = null;
- private UnsafeShuffleExternalSorter sorter = null;
+ @Nullable private MapStatus mapStatus;
+ @Nullable private UnsafeShuffleExternalSorter sorter;
+ private long peakMemoryUsedBytes = 0;
/** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
@@ -131,9 +133,28 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@VisibleForTesting
public int maxRecordSizeBytes() {
+ assert(sorter != null);
return sorter.maxRecordSizeBytes;
}
+ private void updatePeakMemoryUsed() {
+ // sorter can be null if this writer is closed
+ if (sorter != null) {
+ long mem = sorter.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
+ }
+
/**
* This convenience method should only be called in test code.
*/
@@ -144,7 +165,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
- // Keep track of success so we know if we ecountered an exception
+ // Keep track of success so we know if we encountered an exception
// We do this rather than a standard try/catch/re-throw to handle
// generic throwables.
boolean success = false;
@@ -189,6 +210,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@VisibleForTesting
void closeAndWriteOutput() throws IOException {
+ assert(sorter != null);
+ updatePeakMemoryUsed();
serBuffer = null;
serOutputStream = null;
final SpillInfo[] spills = sorter.closeAndGetSpills();
@@ -209,6 +232,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@VisibleForTesting
void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
+ assert(sorter != null);
final K key = record._1();
final int partitionId = partitioner.getPartition(key);
serBuffer.reset();
@@ -431,6 +455,14 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@Override
public Option<MapStatus> stop(boolean success) {
try {
+ // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
+ Map<String, Accumulator<Object>> internalAccumulators =
+ taskContext.internalMetricsToAccumulators();
+ if (internalAccumulators != null) {
+ internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
+ .add(getPeakMemoryUsedBytes());
+ }
+
if (stopping) {
return Option.apply(null);
} else {
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 01a66084e9..20347433e1 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -505,7 +505,7 @@ public final class BytesToBytesMap {
// Here, we'll copy the data into our data pages. Because we only store a relative offset from
// the key address instead of storing the absolute address of the value, the key and value
// must be stored in the same memory page.
- // (8 byte key length) (key) (8 byte value length) (value)
+ // (8 byte key length) (key) (value)
final long requiredSize = 8 + keyLengthBytes + valueLengthBytes;
// --- Figure out where to insert the new record ---------------------------------------------
@@ -655,7 +655,10 @@ public final class BytesToBytesMap {
return pageSizeBytes;
}
- /** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
+ /**
+ * Returns the total amount of memory, in bytes, consumed by this map's managed structures.
+ * Note that this is also the peak memory used by this map, since the map is append-only.
+ */
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
for (MemoryBlock dataPage : dataPages) {
@@ -674,7 +677,6 @@ public final class BytesToBytesMap {
return timeSpentResizingNs;
}
-
/**
* Returns the average number of probes per key lookup.
*/
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index b984301cbb..bf5f965a9d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -70,13 +70,14 @@ public final class UnsafeExternalSorter {
private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
// These variables are reset after spilling:
- private UnsafeInMemorySorter inMemSorter;
+ @Nullable private UnsafeInMemorySorter inMemSorter;
// Whether the in-mem sorter is created internally, or passed in from outside.
// If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
private boolean isInMemSorterExternal = false;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
+ private long peakMemoryUsedBytes = 0;
public static UnsafeExternalSorter createWithExistingInMemorySorter(
TaskMemoryManager taskMemoryManager,
@@ -183,6 +184,7 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
+ assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
Utils.bytesToString(getMemoryUsage()),
@@ -219,7 +221,22 @@ public final class UnsafeExternalSorter {
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
- return inMemSorter.getMemoryUsage() + totalPageSize;
+ return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
+ }
+
+ private void updatePeakMemoryUsed() {
+ long mem = getMemoryUsage();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
}
@VisibleForTesting
@@ -233,6 +250,7 @@ public final class UnsafeExternalSorter {
* @return the number of bytes freed.
*/
public long freeMemory() {
+ updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
taskMemoryManager.freePage(block);
@@ -277,7 +295,8 @@ public final class UnsafeExternalSorter {
* @return true if the record can be inserted without requiring more allocations, false otherwise.
*/
private boolean haveSpaceForRecord(int requiredSpace) {
- assert (requiredSpace > 0);
+ assert(requiredSpace > 0);
+ assert(inMemSorter != null);
return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
}
@@ -290,6 +309,7 @@ public final class UnsafeExternalSorter {
* the record size.
*/
private void allocateSpaceForRecord(int requiredSpace) throws IOException {
+ assert(inMemSorter != null);
// TODO: merge these steps to first calculate total memory requirements for this insert,
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
// data page.
@@ -350,6 +370,7 @@ public final class UnsafeExternalSorter {
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
+ assert(inMemSorter != null);
final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
@@ -382,6 +403,7 @@ public final class UnsafeExternalSorter {
if (!haveSpaceForRecord(totalSpaceRequired)) {
allocateSpaceForRecord(totalSpaceRequired);
}
+ assert(inMemSorter != null);
final long recordAddress =
taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
@@ -405,6 +427,7 @@ public final class UnsafeExternalSorter {
}
public UnsafeSorterIterator getSortedIterator() throws IOException {
+ assert(inMemSorter != null);
final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index b1cef47042..648cd1b104 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -207,7 +207,7 @@ span.additional-metric-title {
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .fetch_wait_time, .shuffle_read_remote,
-.serialization_time, .getting_result_time {
+.serialization_time, .getting_result_time, .peak_execution_memory {
display: none;
}
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index eb75f26718..b6a0119c69 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -152,8 +152,14 @@ class Accumulable[R, T] private[spark] (
in.defaultReadObject()
value_ = zero
deserialized = true
- val taskContext = TaskContext.get()
- taskContext.registerAccumulator(this)
+ // Automatically register the accumulator when it is deserialized with the task closure.
+ // Note that internal accumulators are deserialized before the TaskContext is created and
+ // are registered in the TaskContext constructor.
+ if (!isInternal) {
+ val taskContext = TaskContext.get()
+ assume(taskContext != null, "Task context was null when deserializing user accumulators")
+ taskContext.registerAccumulator(this)
+ }
}
override def toString: String = if (value_ == null) "null" else value_.toString
@@ -248,10 +254,20 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
-class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
- extends Accumulable[T, T](initialValue, param, name) {
+class Accumulator[T] private[spark] (
+ @transient initialValue: T,
+ param: AccumulatorParam[T],
+ name: Option[String],
+ internal: Boolean)
+ extends Accumulable[T, T](initialValue, param, name, internal) {
+
+ def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
+ this(initialValue, param, name, false)
+ }
- def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
+ def this(initialValue: T, param: AccumulatorParam[T]) = {
+ this(initialValue, param, None, false)
+ }
}
/**
@@ -342,3 +358,37 @@ private[spark] object Accumulators extends Logging {
}
}
+
+private[spark] object InternalAccumulator {
+ val PEAK_EXECUTION_MEMORY = "peakExecutionMemory"
+ val TEST_ACCUMULATOR = "testAccumulator"
+
+ // For testing only.
+ // This needs to be a def since we don't want to reuse the same accumulator across stages.
+ private def maybeTestAccumulator: Option[Accumulator[Long]] = {
+ if (sys.props.contains("spark.testing")) {
+ Some(new Accumulator(
+ 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true))
+ } else {
+ None
+ }
+ }
+
+ /**
+ * Accumulators for tracking internal metrics.
+ *
+ * These accumulators are created with the stage such that all tasks in the stage will
+ * add to the same set of accumulators. We do this to report the distribution of accumulator
+ * values across all tasks within each stage.
+ */
+ def create(): Seq[Accumulator[Long]] = {
+ Seq(
+ // Execution memory refers to the memory used by internal data structures created
+ // during shuffles, aggregations and joins. The value of this accumulator should be
+ // approximately the sum of the peak sizes across all such data structures created
+ // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort.
+ new Accumulator(
+ 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true)
+ ) ++ maybeTestAccumulator.toSeq
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index ceeb58075d..289aab9bd9 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -58,12 +58,7 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
combiners.insertAll(iter)
- // Update task metrics if context is not null
- // TODO: Make context non optional in a future release
- Option(context).foreach { c =>
- c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
- c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
- }
+ updateMetrics(context, combiners)
combiners.iterator
}
}
@@ -89,13 +84,18 @@ case class Aggregator[K, V, C] (
} else {
val combiners = new ExternalAppendOnlyMap[K, C, C](identity, mergeCombiners, mergeCombiners)
combiners.insertAll(iter)
- // Update task metrics if context is not null
- // TODO: Make context non-optional in a future release
- Option(context).foreach { c =>
- c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
- c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
- }
+ updateMetrics(context, combiners)
combiners.iterator
}
}
+
+ /** Update task metrics after populating the external map. */
+ private def updateMetrics(context: TaskContext, map: ExternalAppendOnlyMap[_, _, _]): Unit = {
+ Option(context).foreach { c =>
+ c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+ c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+ c.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 5d2c551d58..63cca80b2d 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -61,12 +61,12 @@ object TaskContext {
protected[spark] def unset(): Unit = taskContext.remove()
/**
- * Return an empty task context that is not actually used.
- * Internal use only.
+ * An empty task context that does not represent an actual task.
*/
- private[spark] def empty(): TaskContext = {
- new TaskContextImpl(0, 0, 0, 0, null, null)
+ private[spark] def empty(): TaskContextImpl = {
+ new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty)
}
+
}
@@ -187,4 +187,9 @@ abstract class TaskContext extends Serializable {
* accumulator id and the value of the Map is the latest accumulator local value.
*/
private[spark] def collectAccumulators(): Map[Long, Any]
+
+ /**
+ * Accumulators for tracking internal metrics indexed by the name.
+ */
+ private[spark] val internalMetricsToAccumulators: Map[String, Accumulator[Long]]
}
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 9ee168ae01..5df94c6d3a 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -32,6 +32,7 @@ private[spark] class TaskContextImpl(
override val attemptNumber: Int,
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
+ internalAccumulators: Seq[Accumulator[Long]],
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
@@ -114,4 +115,11 @@ private[spark] class TaskContextImpl(
private[spark] override def collectAccumulators(): Map[Long, Any] = synchronized {
accumulators.mapValues(_.localValue).toMap
}
+
+ private[spark] override val internalMetricsToAccumulators: Map[String, Accumulator[Long]] = {
+ // Explicitly register internal accumulators here because these are
+ // not captured in the task closure and are already deserialized
+ internalAccumulators.foreach(registerAccumulator)
+ internalAccumulators.map { a => (a.name.get, a) }.toMap
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 130b58882d..9c617fc719 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,8 +23,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
-import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
import org.apache.spark.util.Utils
@@ -169,8 +168,10 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
for ((it, depNum) <- rddIterators) {
map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
- context.taskMetrics.incMemoryBytesSpilled(map.memoryBytesSpilled)
- context.taskMetrics.incDiskBytesSpilled(map.diskBytesSpilled)
+ context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+ context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
new InterruptibleIterator(context,
map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index e0edd7d4ae..11d123eec4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -24,11 +24,12 @@ import org.apache.spark.annotation.DeveloperApi
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
*/
@DeveloperApi
-class AccumulableInfo (
+class AccumulableInfo private[spark] (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
- val value: String) {
+ val value: String,
+ val internal: Boolean) {
override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
@@ -40,10 +41,10 @@ class AccumulableInfo (
object AccumulableInfo {
def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, update, value)
+ new AccumulableInfo(id, name, update, value, internal = false)
}
def apply(id: Long, name: String, value: String): AccumulableInfo = {
- new AccumulableInfo(id, name, None, value)
+ new AccumulableInfo(id, name, None, value, internal = false)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c4fa277c21..bb489c6b6e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -773,16 +773,26 @@ class DAGScheduler(
stage.pendingTasks.clear()
// First figure out the indexes of partition ids to compute.
- val partitionsToCompute: Seq[Int] = {
+ val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
stage match {
case stage: ShuffleMapStage =>
- (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
+ val allPartitions = 0 until stage.numPartitions
+ val filteredPartitions = allPartitions.filter { id => stage.outputLocs(id).isEmpty }
+ (allPartitions, filteredPartitions)
case stage: ResultStage =>
val job = stage.resultOfJob.get
- (0 until job.numPartitions).filter(id => !job.finished(id))
+ val allPartitions = 0 until job.numPartitions
+ val filteredPartitions = allPartitions.filter { id => !job.finished(id) }
+ (allPartitions, filteredPartitions)
}
}
+ // Reset internal accumulators only if this stage is not partially submitted
+ // Otherwise, we may override existing accumulator values from some tasks
+ if (allPartitions == partitionsToCompute) {
+ stage.resetInternalAccumulators()
+ }
+
val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
runningStages += stage
@@ -852,7 +862,8 @@ class DAGScheduler(
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
- new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs)
+ new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
+ taskBinary, part, locs, stage.internalAccumulators)
}
case stage: ResultStage =>
@@ -861,7 +872,8 @@ class DAGScheduler(
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
- new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id)
+ new ResultTask(stage.id, stage.latestInfo.attemptId,
+ taskBinary, part, locs, id, stage.internalAccumulators)
}
}
} catch {
@@ -916,9 +928,11 @@ class DAGScheduler(
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
- stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, s"${acc.value}")
+ val value = s"${acc.value}"
+ stage.latestInfo.accumulables(id) =
+ new AccumulableInfo(id, name, None, value, acc.isInternal)
event.taskInfo.accumulables +=
- AccumulableInfo(id, name, Some(s"$partialValue"), s"${acc.value}")
+ new AccumulableInfo(id, name, Some(s"$partialValue"), value, acc.isInternal)
}
}
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 9c2606e278..c4dc080e2b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -45,8 +45,10 @@ private[spark] class ResultTask[T, U](
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient locs: Seq[TaskLocation],
- val outputId: Int)
- extends Task[U](stageId, stageAttemptId, partition.index) with Serializable {
+ val outputId: Int,
+ internalAccumulators: Seq[Accumulator[Long]])
+ extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators)
+ with Serializable {
@transient private[this] val preferredLocs: Seq[TaskLocation] = {
if (locs == null) Nil else locs.toSet.toSeq
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 14c8c00961..f478f9982a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -43,12 +43,14 @@ private[spark] class ShuffleMapTask(
stageAttemptId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
- @transient private var locs: Seq[TaskLocation])
- extends Task[MapStatus](stageId, stageAttemptId, partition.index) with Logging {
+ @transient private var locs: Seq[TaskLocation],
+ internalAccumulators: Seq[Accumulator[Long]])
+ extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators)
+ with Logging {
/** A constructor used only in test suites. This does not require passing in an RDD. */
def this(partitionId: Int) {
- this(0, 0, null, new Partition { override def index: Int = 0 }, null)
+ this(0, 0, null, new Partition { override def index: Int = 0 }, null, null)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
@@ -69,7 +71,7 @@ private[spark] class ShuffleMapTask(
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
- return writer.stop(success = true).get
+ writer.stop(success = true).get
} catch {
case e: Exception =>
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 40a333a3e0..de05ee256d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -68,6 +68,22 @@ private[spark] abstract class Stage(
val name = callSite.shortForm
val details = callSite.longForm
+ private var _internalAccumulators: Seq[Accumulator[Long]] = Seq.empty
+
+ /** Internal accumulators shared across all tasks in this stage. */
+ def internalAccumulators: Seq[Accumulator[Long]] = _internalAccumulators
+
+ /**
+ * Re-initialize the internal accumulators associated with this stage.
+ *
+ * This is called every time the stage is submitted, *except* when a subset of tasks
+ * belonging to this stage has already finished. Otherwise, reinitializing the internal
+ * accumulators here again will override partial values from the finished tasks.
+ */
+ def resetInternalAccumulators(): Unit = {
+ _internalAccumulators = InternalAccumulator.create()
+ }
+
/**
* Pointer to the [StageInfo] object for the most recent attempt. This needs to be initialized
* here, before any attempts have actually been created, because the DAGScheduler uses this
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 1978305cfe..9edf9f048f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
import scala.collection.mutable.HashMap
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.{SparkEnv, TaskContextImpl, TaskContext}
+import org.apache.spark.{Accumulator, SparkEnv, TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.unsafe.memory.TaskMemoryManager
@@ -47,7 +47,8 @@ import org.apache.spark.util.Utils
private[spark] abstract class Task[T](
val stageId: Int,
val stageAttemptId: Int,
- var partitionId: Int) extends Serializable {
+ val partitionId: Int,
+ internalAccumulators: Seq[Accumulator[Long]]) extends Serializable {
/**
* The key of the Map is the accumulator id and the value of the Map is the latest accumulator
@@ -68,12 +69,13 @@ private[spark] abstract class Task[T](
metricsSystem: MetricsSystem)
: (T, AccumulatorUpdates) = {
context = new TaskContextImpl(
- stageId = stageId,
- partitionId = partitionId,
- taskAttemptId = taskAttemptId,
- attemptNumber = attemptNumber,
- taskMemoryManager = taskMemoryManager,
- metricsSystem = metricsSystem,
+ stageId,
+ partitionId,
+ taskAttemptId,
+ attemptNumber,
+ taskMemoryManager,
+ metricsSystem,
+ internalAccumulators,
runningLocally = false)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
index de79fa56f0..0c8f08f0f3 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala
@@ -17,7 +17,7 @@
package org.apache.spark.shuffle.hash
-import org.apache.spark.{InterruptibleIterator, Logging, MapOutputTracker, SparkEnv, TaskContext}
+import org.apache.spark._
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleReader}
import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator}
@@ -100,8 +100,10 @@ private[spark] class HashShuffleReader[K, C](
// the ExternalSorter won't spill to disk.
val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
sorter.insertAll(aggregatedIter)
- context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
- context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
+ context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
+ context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes)
sorter.iterator
case None =>
aggregatedIter
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
index e2d25e3636..cb122eaed8 100644
--- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -62,6 +62,13 @@ private[spark] object ToolTips {
"""Time that the executor spent paused for Java garbage collection while the task was
running."""
+ val PEAK_EXECUTION_MEMORY =
+ """Execution memory refers to the memory used by internal data structures created during
+ shuffles, aggregations and joins when Tungsten is enabled. The value of this accumulator
+ should be approximately the sum of the peak sizes across all such data structures created
+ in this task. For SQL jobs, this only tracks all unsafe operators, broadcast joins, and
+ external sort."""
+
val JOB_TIMELINE =
"""Shows when jobs started and ended and when executors joined or left. Drag to scroll.
Click Enable Zooming and use mouse wheel to zoom in/out."""
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index cf04b5e592..3954c3d1ef 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -26,6 +26,7 @@ import scala.xml.{Elem, Node, Unparsed}
import org.apache.commons.lang3.StringEscapeUtils
+import org.apache.spark.{InternalAccumulator, SparkConf}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.ui._
@@ -67,6 +68,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// if we find that it's okay.
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
+ private val displayPeakExecutionMemory =
+ parent.conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
def render(request: HttpServletRequest): Seq[Node] = {
progressListener.synchronized {
@@ -114,10 +117,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
-
val numCompleted = tasks.count(_.taskInfo.finished)
- val accumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
- val hasAccumulators = accumulables.size > 0
+
+ val allAccumulables = progressListener.stageIdToData((stageId, stageAttemptId)).accumulables
+ val externalAccumulables = allAccumulables.values.filter { acc => !acc.internal }
+ val hasAccumulators = externalAccumulables.size > 0
val summary =
<div>
@@ -221,6 +225,15 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Getting Result Time</span>
</span>
</li>
+ {if (displayPeakExecutionMemory) {
+ <li>
+ <span data-toggle="tooltip"
+ title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
+ <input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
+ <span class="additional-metric-title">Peak Execution Memory</span>
+ </span>
+ </li>
+ }}
</ul>
</div>
</div>
@@ -241,11 +254,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val accumulableTable = UIUtils.listingTable(
accumulableHeaders,
accumulableRow,
- accumulables.values.toSeq)
+ externalAccumulables.toSeq)
val currentTime = System.currentTimeMillis()
val (taskTable, taskTableHTML) = try {
val _taskTable = new TaskPagedTable(
+ parent.conf,
UIUtils.prependBaseUri(parent.basePath) +
s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
tasks,
@@ -294,12 +308,14 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else {
def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] =
Distribution(data).get.getQuantiles()
-
def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
getDistributionQuantiles(times).map { millis =>
<td>{UIUtils.formatDuration(millis.toLong)}</td>
}
}
+ def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = {
+ getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
+ }
val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorDeserializeTime.toDouble
@@ -349,6 +365,23 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)
+
+ val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) =>
+ info.accumulables
+ .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
+ .map { acc => acc.value.toLong }
+ .getOrElse(0L)
+ .toDouble
+ }
+ val peakExecutionMemoryQuantiles = {
+ <td>
+ <span data-toggle="tooltip"
+ title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
+ Peak Execution Memory
+ </span>
+ </td> +: getFormattedSizeQuantiles(peakExecutionMemory)
+ }
+
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
@@ -359,10 +392,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
val schedulerDelayQuantiles = schedulerDelayTitle +:
getFormattedTimeQuantiles(schedulerDelays)
-
- def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] =
- getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
-
def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double])
: Seq[Elem] = {
val recordDist = getDistributionQuantiles(records).iterator
@@ -466,6 +495,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
{serializationQuantiles}
</tr>,
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
+ if (displayPeakExecutionMemory) {
+ <tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
+ {peakExecutionMemoryQuantiles}
+ </tr>
+ } else {
+ Nil
+ },
if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
if (stageData.hasShuffleRead) {
@@ -499,7 +535,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
val maybeAccumulableTable: Seq[Node] =
- if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
+ if (hasAccumulators) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
val content =
summary ++
@@ -750,29 +786,30 @@ private[ui] case class TaskTableRowBytesSpilledData(
* Contains all data that needs for sorting and generating HTML. Using this one rather than
* TaskUIData to avoid creating duplicate contents during sorting the data.
*/
-private[ui] case class TaskTableRowData(
- index: Int,
- taskId: Long,
- attempt: Int,
- speculative: Boolean,
- status: String,
- taskLocality: String,
- executorIdAndHost: String,
- launchTime: Long,
- duration: Long,
- formatDuration: String,
- schedulerDelay: Long,
- taskDeserializationTime: Long,
- gcTime: Long,
- serializationTime: Long,
- gettingResultTime: Long,
- accumulators: Option[String], // HTML
- input: Option[TaskTableRowInputData],
- output: Option[TaskTableRowOutputData],
- shuffleRead: Option[TaskTableRowShuffleReadData],
- shuffleWrite: Option[TaskTableRowShuffleWriteData],
- bytesSpilled: Option[TaskTableRowBytesSpilledData],
- error: String)
+private[ui] class TaskTableRowData(
+ val index: Int,
+ val taskId: Long,
+ val attempt: Int,
+ val speculative: Boolean,
+ val status: String,
+ val taskLocality: String,
+ val executorIdAndHost: String,
+ val launchTime: Long,
+ val duration: Long,
+ val formatDuration: String,
+ val schedulerDelay: Long,
+ val taskDeserializationTime: Long,
+ val gcTime: Long,
+ val serializationTime: Long,
+ val gettingResultTime: Long,
+ val peakExecutionMemoryUsed: Long,
+ val accumulators: Option[String], // HTML
+ val input: Option[TaskTableRowInputData],
+ val output: Option[TaskTableRowOutputData],
+ val shuffleRead: Option[TaskTableRowShuffleReadData],
+ val shuffleWrite: Option[TaskTableRowShuffleWriteData],
+ val bytesSpilled: Option[TaskTableRowBytesSpilledData],
+ val error: String)
private[ui] class TaskDataSource(
tasks: Seq[TaskUIData],
@@ -816,10 +853,15 @@ private[ui] class TaskDataSource(
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = getGettingResultTime(info, currentTime)
- val maybeAccumulators = info.accumulables
- val accumulatorsReadable = maybeAccumulators.map { acc =>
+ val (taskInternalAccumulables, taskExternalAccumulables) =
+ info.accumulables.partition(_.internal)
+ val externalAccumulableReadable = taskExternalAccumulables.map { acc =>
StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
}
+ val peakExecutionMemoryUsed = taskInternalAccumulables
+ .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY }
+ .map { acc => acc.value.toLong }
+ .getOrElse(0L)
val maybeInput = metrics.flatMap(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
@@ -923,7 +965,7 @@ private[ui] class TaskDataSource(
None
}
- TaskTableRowData(
+ new TaskTableRowData(
info.index,
info.taskId,
info.attempt,
@@ -939,7 +981,8 @@ private[ui] class TaskDataSource(
gcTime,
serializationTime,
gettingResultTime,
- if (hasAccumulators) Some(accumulatorsReadable.mkString("<br/>")) else None,
+ peakExecutionMemoryUsed,
+ if (hasAccumulators) Some(externalAccumulableReadable.mkString("<br/>")) else None,
input,
output,
shuffleRead,
@@ -1006,6 +1049,10 @@ private[ui] class TaskDataSource(
override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
Ordering.Long.compare(x.gettingResultTime, y.gettingResultTime)
}
+ case "Peak Execution Memory" => new Ordering[TaskTableRowData] {
+ override def compare(x: TaskTableRowData, y: TaskTableRowData): Int =
+ Ordering.Long.compare(x.peakExecutionMemoryUsed, y.peakExecutionMemoryUsed)
+ }
case "Accumulators" =>
if (hasAccumulators) {
new Ordering[TaskTableRowData] {
@@ -1132,6 +1179,7 @@ private[ui] class TaskDataSource(
}
private[ui] class TaskPagedTable(
+ conf: SparkConf,
basePath: String,
data: Seq[TaskUIData],
hasAccumulators: Boolean,
@@ -1143,7 +1191,11 @@ private[ui] class TaskPagedTable(
currentTime: Long,
pageSize: Int,
sortColumn: String,
- desc: Boolean) extends PagedTable[TaskTableRowData]{
+ desc: Boolean) extends PagedTable[TaskTableRowData] {
+
+ // We only track peak memory used for unsafe operators
+ private val displayPeakExecutionMemory =
+ conf.getOption("spark.sql.unsafe.enabled").exists(_.toBoolean)
override def tableId: String = ""
@@ -1195,6 +1247,13 @@ private[ui] class TaskPagedTable(
("GC Time", ""),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
+ {
+ if (displayPeakExecutionMemory) {
+ Seq(("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY))
+ } else {
+ Nil
+ }
+ } ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
{if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
{if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
@@ -1271,6 +1330,11 @@ private[ui] class TaskPagedTable(
<td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
{UIUtils.formatDuration(task.gettingResultTime)}
</td>
+ {if (displayPeakExecutionMemory) {
+ <td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
+ {Utils.bytesToString(task.peakExecutionMemoryUsed)}
+ </td>
+ }}
{if (task.accumulators.nonEmpty) {
<td>{Unparsed(task.accumulators.get)}</td>
}}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
index 9bf67db8ac..d2dfc5a329 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/TaskDetailsClassNames.scala
@@ -31,4 +31,5 @@ private[spark] object TaskDetailsClassNames {
val SHUFFLE_READ_REMOTE_SIZE = "shuffle_read_remote"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
+ val PEAK_EXECUTION_MEMORY = "peak_execution_memory"
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index d166037351..f929b12606 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -89,6 +89,7 @@ class ExternalAppendOnlyMap[K, V, C](
// Number of bytes spilled in total
private var _diskBytesSpilled = 0L
+ def diskBytesSpilled: Long = _diskBytesSpilled
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
private val fileBufferSize =
@@ -97,6 +98,10 @@ class ExternalAppendOnlyMap[K, V, C](
// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _
+ // Peak size of the in-memory map observed so far, in bytes
+ private var _peakMemoryUsedBytes: Long = 0L
+ def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
+
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
@@ -126,7 +131,11 @@ class ExternalAppendOnlyMap[K, V, C](
while (entries.hasNext) {
curEntry = entries.next()
- if (maybeSpill(currentMap, currentMap.estimateSize())) {
+ val estimatedSize = currentMap.estimateSize()
+ if (estimatedSize > _peakMemoryUsedBytes) {
+ _peakMemoryUsedBytes = estimatedSize
+ }
+ if (maybeSpill(currentMap, estimatedSize)) {
currentMap = new SizeTrackingAppendOnlyMap[K, C]
}
currentMap.changeValue(curEntry._1, update)
@@ -207,8 +216,6 @@ class ExternalAppendOnlyMap[K, V, C](
spilledMaps.append(new DiskMapIterator(file, blockId, batchSizes))
}
- def diskBytesSpilled: Long = _diskBytesSpilled
-
/**
* Return an iterator that merges the in-memory map with the spilled maps.
* If no spill has occurred, simply return the in-memory map's iterator.
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index ba7ec834d6..19287edbaf 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -152,6 +152,9 @@ private[spark] class ExternalSorter[K, V, C](
private var _diskBytesSpilled = 0L
def diskBytesSpilled: Long = _diskBytesSpilled
+ // Peak size of the in-memory data structure observed so far, in bytes
+ private var _peakMemoryUsedBytes: Long = 0L
+ def peakMemoryUsedBytes: Long = _peakMemoryUsedBytes
// A comparator for keys K that orders them within a partition to allow aggregation or sorting.
// Can be a partial ordering by hash code if a total ordering is not provided through by the
@@ -224,15 +227,22 @@ private[spark] class ExternalSorter[K, V, C](
return
}
+ var estimatedSize = 0L
if (usingMap) {
- if (maybeSpill(map, map.estimateSize())) {
+ estimatedSize = map.estimateSize()
+ if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
- if (maybeSpill(buffer, buffer.estimateSize())) {
+ estimatedSize = buffer.estimateSize()
+ if (maybeSpill(buffer, estimatedSize)) {
buffer = newBuffer()
}
}
+
+ if (estimatedSize > _peakMemoryUsedBytes) {
+ _peakMemoryUsedBytes = estimatedSize
+ }
}
/**
@@ -684,8 +694,10 @@ private[spark] class ExternalSorter[K, V, C](
}
}
- context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
- context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
+ context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
+ context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
+ context.internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemoryUsedBytes)
lengths
}