aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
blob: bda2a91d9d2cab3fd12282a0b3a56403eae64235 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.executor

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.storage.{BlockId, BlockStatus}


/**
 * :: DeveloperApi ::
 * Metrics tracked during the execution of a task.
 *
 * This class is wrapper around a collection of internal accumulators that represent metrics
 * associated with a task. The local values of these accumulators are sent from the executor
 * to the driver when the task completes. These values are then merged into the corresponding
 * accumulator previously registered on the driver.
 *
 * The accumulator updates are also sent to the driver periodically (on executor heartbeat)
 * and when the task failed with an exception. The [[TaskMetrics]] object itself should never
 * be sent to the driver.
 *
 * @param initialAccums the initial set of accumulators that this [[TaskMetrics]] depends on.
 *                      Each accumulator in this initial set must be uniquely named and marked
 *                      as internal. Additional accumulators registered later need not satisfy
 *                      these requirements.
 */
@DeveloperApi
class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Serializable {
  import InternalAccumulator._

  // Needed for Java tests
  def this() {
    this(InternalAccumulator.createAll())
  }

  /**
   * All accumulators registered with this task.
   */
  private val accums = new ArrayBuffer[Accumulable[_, _]]
  accums ++= initialAccums

  /**
   * A map for quickly accessing the initial set of accumulators by name.
   */
  private val initialAccumsMap: Map[String, Accumulator[_]] = {
    val map = new mutable.HashMap[String, Accumulator[_]]
    initialAccums.foreach { a =>
      val name = a.name.getOrElse {
        throw new IllegalArgumentException(
          "initial accumulators passed to TaskMetrics must be named")
      }
      require(a.isInternal,
        s"initial accumulator '$name' passed to TaskMetrics must be marked as internal")
      require(!map.contains(name),
        s"detected duplicate accumulator name '$name' when constructing TaskMetrics")
      map(name) = a
    }
    map.toMap
  }

  // Each metric is internally represented as an accumulator
  private val _executorDeserializeTime = getAccum(EXECUTOR_DESERIALIZE_TIME)
  private val _executorRunTime = getAccum(EXECUTOR_RUN_TIME)
  private val _resultSize = getAccum(RESULT_SIZE)
  private val _jvmGCTime = getAccum(JVM_GC_TIME)
  private val _resultSerializationTime = getAccum(RESULT_SERIALIZATION_TIME)
  private val _memoryBytesSpilled = getAccum(MEMORY_BYTES_SPILLED)
  private val _diskBytesSpilled = getAccum(DISK_BYTES_SPILLED)
  private val _peakExecutionMemory = getAccum(PEAK_EXECUTION_MEMORY)
  private val _updatedBlockStatuses =
    TaskMetrics.getAccum[Seq[(BlockId, BlockStatus)]](initialAccumsMap, UPDATED_BLOCK_STATUSES)

  /**
   * Time taken on the executor to deserialize this task.
   */
  def executorDeserializeTime: Long = _executorDeserializeTime.localValue

  /**
   * Time the executor spends actually running the task (including fetching shuffle data).
   */
  def executorRunTime: Long = _executorRunTime.localValue

  /**
   * The number of bytes this task transmitted back to the driver as the TaskResult.
   */
  def resultSize: Long = _resultSize.localValue

  /**
   * Amount of time the JVM spent in garbage collection while executing this task.
   */
  def jvmGCTime: Long = _jvmGCTime.localValue

  /**
   * Amount of time spent serializing the task result.
   */
  def resultSerializationTime: Long = _resultSerializationTime.localValue

  /**
   * The number of in-memory bytes spilled by this task.
   */
  def memoryBytesSpilled: Long = _memoryBytesSpilled.localValue

  /**
   * The number of on-disk bytes spilled by this task.
   */
  def diskBytesSpilled: Long = _diskBytesSpilled.localValue

  /**
   * Peak memory used by internal data structures created during shuffles, aggregations and
   * joins. The value of this accumulator should be approximately the sum of the peak sizes
   * across all such data structures created in this task. For SQL jobs, this only tracks all
   * unsafe operators and ExternalSort.
   */
  def peakExecutionMemory: Long = _peakExecutionMemory.localValue

  /**
   * Storage statuses of any blocks that have been updated as a result of this task.
   */
  def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue

  // Setters and increment-ers
  private[spark] def setExecutorDeserializeTime(v: Long): Unit =
    _executorDeserializeTime.setValue(v)
  private[spark] def setExecutorRunTime(v: Long): Unit = _executorRunTime.setValue(v)
  private[spark] def setResultSize(v: Long): Unit = _resultSize.setValue(v)
  private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
  private[spark] def setResultSerializationTime(v: Long): Unit =
    _resultSerializationTime.setValue(v)
  private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
  private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
  private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
  private[spark] def incUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
    _updatedBlockStatuses.add(v)
  private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
    _updatedBlockStatuses.setValue(v)

  /**
   * Get a Long accumulator from the given map by name, assuming it exists.
   * Note: this only searches the initial set of accumulators passed into the constructor.
   */
  private[spark] def getAccum(name: String): Accumulator[Long] = {
    TaskMetrics.getAccum[Long](initialAccumsMap, name)
  }


  /* ========================== *
   |        INPUT METRICS       |
   * ========================== */

  private var _inputMetrics: Option[InputMetrics] = None

  /**
   * Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
   * data, defined only in tasks with input.
   */
  def inputMetrics: Option[InputMetrics] = _inputMetrics

  /**
   * Get or create a new [[InputMetrics]] associated with this task.
   */
  private[spark] def registerInputMetrics(readMethod: DataReadMethod.Value): InputMetrics = {
    synchronized {
      val metrics = _inputMetrics.getOrElse {
        val metrics = new InputMetrics(initialAccumsMap)
        metrics.setReadMethod(readMethod)
        _inputMetrics = Some(metrics)
        metrics
      }
      // If there already exists an InputMetric with the same read method, we can just return
      // that one. Otherwise, if the read method is different from the one previously seen by
      // this task, we return a new dummy one to avoid clobbering the values of the old metrics.
      // In the future we should try to store input metrics from all different read methods at
      // the same time (SPARK-5225).
      if (metrics.readMethod == readMethod) {
        metrics
      } else {
        val m = new InputMetrics
        m.setReadMethod(readMethod)
        m
      }
    }
  }


  /* ============================ *
   |        OUTPUT METRICS        |
   * ============================ */

  private var _outputMetrics: Option[OutputMetrics] = None

  /**
   * Metrics related to writing data externally (e.g. to a distributed filesystem),
   * defined only in tasks with output.
   */
  def outputMetrics: Option[OutputMetrics] = _outputMetrics

  /**
   * Get or create a new [[OutputMetrics]] associated with this task.
   */
  private[spark] def registerOutputMetrics(
      writeMethod: DataWriteMethod.Value): OutputMetrics = synchronized {
    _outputMetrics.getOrElse {
      val metrics = new OutputMetrics(initialAccumsMap)
      metrics.setWriteMethod(writeMethod)
      _outputMetrics = Some(metrics)
      metrics
    }
  }


  /* ================================== *
   |        SHUFFLE READ METRICS        |
   * ================================== */

  private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None

  /**
   * Metrics related to shuffle read aggregated across all shuffle dependencies.
   * This is defined only if there are shuffle dependencies in this task.
   */
  def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics

  /**
   * Temporary list of [[ShuffleReadMetrics]], one per shuffle dependency.
   *
   * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization
   * issues from readers in different threads, in-progress tasks use a [[ShuffleReadMetrics]] for
   * each dependency and merge these metrics before reporting them to the driver.
   */
  @transient private lazy val tempShuffleReadMetrics = new ArrayBuffer[ShuffleReadMetrics]

  /**
   * Create a temporary [[ShuffleReadMetrics]] for a particular shuffle dependency.
   *
   * All usages are expected to be followed by a call to [[mergeShuffleReadMetrics]], which
   * merges the temporary values synchronously. Otherwise, all temporary data collected will
   * be lost.
   */
  private[spark] def registerTempShuffleReadMetrics(): ShuffleReadMetrics = synchronized {
    val readMetrics = new ShuffleReadMetrics
    tempShuffleReadMetrics += readMetrics
    readMetrics
  }

  /**
   * Merge values across all temporary [[ShuffleReadMetrics]] into `_shuffleReadMetrics`.
   * This is expected to be called on executor heartbeat and at the end of a task.
   */
  private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
    if (tempShuffleReadMetrics.nonEmpty) {
      val metrics = new ShuffleReadMetrics(initialAccumsMap)
      metrics.setMergeValues(tempShuffleReadMetrics)
      _shuffleReadMetrics = Some(metrics)
    }
  }

  /* =================================== *
   |        SHUFFLE WRITE METRICS        |
   * =================================== */

  private var _shuffleWriteMetrics: Option[ShuffleWriteMetrics] = None

  /**
   * Metrics related to shuffle write, defined only in shuffle map stages.
   */
  def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics

  /**
   * Get or create a new [[ShuffleWriteMetrics]] associated with this task.
   */
  private[spark] def registerShuffleWriteMetrics(): ShuffleWriteMetrics = synchronized {
    _shuffleWriteMetrics.getOrElse {
      val metrics = new ShuffleWriteMetrics(initialAccumsMap)
      _shuffleWriteMetrics = Some(metrics)
      metrics
    }
  }


  /* ========================== *
   |        OTHER THINGS        |
   * ========================== */

  private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
    accums += a
  }

  /**
   * Return the latest updates of accumulators in this task.
   *
   * The [[AccumulableInfo.update]] field is always defined and the [[AccumulableInfo.value]]
   * field is always empty, since this represents the partial updates recorded in this task,
   * not the aggregated value across multiple tasks.
   */
  def accumulatorUpdates(): Seq[AccumulableInfo] = {
    accums.map { a => a.toInfo(Some(a.localValue), None) }
  }

  // If we are reconstructing this TaskMetrics on the driver, some metrics may already be set.
  // If so, initialize all relevant metrics classes so listeners can access them downstream.
  {
    var (hasShuffleRead, hasShuffleWrite, hasInput, hasOutput) = (false, false, false, false)
    initialAccums
      .filter { a => a.localValue != a.zero }
      .foreach { a =>
        a.name.get match {
          case sr if sr.startsWith(SHUFFLE_READ_METRICS_PREFIX) => hasShuffleRead = true
          case sw if sw.startsWith(SHUFFLE_WRITE_METRICS_PREFIX) => hasShuffleWrite = true
          case in if in.startsWith(INPUT_METRICS_PREFIX) => hasInput = true
          case out if out.startsWith(OUTPUT_METRICS_PREFIX) => hasOutput = true
          case _ =>
        }
      }
    if (hasShuffleRead) { _shuffleReadMetrics = Some(new ShuffleReadMetrics(initialAccumsMap)) }
    if (hasShuffleWrite) { _shuffleWriteMetrics = Some(new ShuffleWriteMetrics(initialAccumsMap)) }
    if (hasInput) { _inputMetrics = Some(new InputMetrics(initialAccumsMap)) }
    if (hasOutput) { _outputMetrics = Some(new OutputMetrics(initialAccumsMap)) }
  }

}

/**
 * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners.
 * Its purpose is to obviate the need for the driver to reconstruct the original accumulators,
 * which might have been garbage-collected. See SPARK-13407 for more details.
 *
 * Instances of this class should be considered read-only and users should not call `inc*()` or
 * `set*()` methods. While we could override the setter methods to throw
 * UnsupportedOperationException, we choose not to do so because the overrides would quickly become
 * out-of-date when new metrics are added.
 */
private[spark] class ListenerTaskMetrics(
    initialAccums: Seq[Accumulator[_]],
    accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) {

  override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates

  override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = {
    throw new UnsupportedOperationException("This TaskMetrics is read-only")
  }
}

private[spark] object TaskMetrics extends Logging {

  def empty: TaskMetrics = new TaskMetrics

  /**
   * Get an accumulator from the given map by name, assuming it exists.
   */
  def getAccum[T](accumMap: Map[String, Accumulator[_]], name: String): Accumulator[T] = {
    require(accumMap.contains(name), s"metric '$name' is missing")
    val accum = accumMap(name)
    try {
      // Note: we can't do pattern matching here because types are erased by compile time
      accum.asInstanceOf[Accumulator[T]]
    } catch {
      case e: ClassCastException =>
        throw new SparkException(s"accumulator $name was of unexpected type", e)
    }
  }

  /**
   * Construct a [[TaskMetrics]] object from a list of accumulator updates, called on driver only.
   *
   * Executors only send accumulator updates back to the driver, not [[TaskMetrics]]. However, we
   * need the latter to post task end events to listeners, so we need to reconstruct the metrics
   * on the driver.
   *
   * This assumes the provided updates contain the initial set of accumulators representing
   * internal task level metrics.
   */
  def fromAccumulatorUpdates(accumUpdates: Seq[AccumulableInfo]): TaskMetrics = {
    // Initial accumulators are passed into the TaskMetrics constructor first because these
    // are required to be uniquely named. The rest of the accumulators from this task are
    // registered later because they need not satisfy this requirement.
    val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined }
    val initialAccums = definedAccumUpdates
      .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) }
      .map { info =>
        val accum = InternalAccumulator.create(info.name.get)
        accum.setValueAny(info.update.get)
        accum
      }
    new ListenerTaskMetrics(initialAccums, definedAccumUpdates)
  }

}