From d557a5e01e8f819d3bd9e6e43d2df733f390d764 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 3 May 2016 19:45:12 +0800 Subject: [SPARK-15081] Move AccumulatorV2 and subclasses into util package ## What changes were proposed in this pull request? This patch moves AccumulatorV2 and subclasses into util package. ## How was this patch tested? Updated relevant tests. Author: Reynold Xin Closes #12863 from rxin/SPARK-15081. --- .../main/scala/org/apache/spark/Accumulable.scala | 1 + .../scala/org/apache/spark/AccumulatorV2.scala | 439 --------------------- .../scala/org/apache/spark/ContextCleaner.scala | 2 +- .../scala/org/apache/spark/HeartbeatReceiver.scala | 2 +- .../main/scala/org/apache/spark/TaskContext.scala | 2 +- .../scala/org/apache/spark/TaskEndReason.scala | 2 +- .../org/apache/spark/executor/InputMetrics.scala | 2 +- .../org/apache/spark/executor/OutputMetrics.scala | 2 +- .../apache/spark/executor/ShuffleReadMetrics.scala | 2 +- .../spark/executor/ShuffleWriteMetrics.scala | 2 +- .../org/apache/spark/executor/TaskMetrics.scala | 1 + .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../scala/org/apache/spark/scheduler/Task.scala | 2 +- .../org/apache/spark/scheduler/TaskResult.scala | 4 +- .../apache/spark/scheduler/TaskResultGetter.scala | 2 +- .../org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../apache/spark/scheduler/TaskSetManager.scala | 2 +- .../org/apache/spark/util/AccumulatorV2.scala | 439 +++++++++++++++++++++ .../scala/org/apache/spark/AccumulatorSuite.scala | 1 + .../apache/spark/InternalAccumulatorSuite.scala | 1 + .../scala/org/apache/spark/SparkFunSuite.scala | 1 + .../apache/spark/executor/TaskMetricsSuite.scala | 1 + .../apache/spark/scheduler/DAGSchedulerSuite.scala | 3 +- .../scheduler/ExternalClusterManagerSuite.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- .../org/apache/spark/util/AccumulatorV2Suite.scala | 2 +- 27 files changed, 466 insertions(+), 460 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/AccumulatorV2.scala create mode 100644 core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala index 799c7e4fd5..5c6761eb76 100644 --- a/core/src/main/scala/org/apache/spark/Accumulable.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, LegacyAccumulatorWrapper} /** diff --git a/core/src/main/scala/org/apache/spark/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/AccumulatorV2.scala deleted file mode 100644 index a6c64fd680..0000000000 --- a/core/src/main/scala/org/apache/spark/AccumulatorV2.scala +++ /dev/null @@ -1,439 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.{lang => jl} -import java.io.ObjectInputStream -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicLong - -import org.apache.spark.scheduler.AccumulableInfo -import org.apache.spark.util.Utils - - -private[spark] case class AccumulatorMetadata( - id: Long, - name: Option[String], - countFailedValues: Boolean) extends Serializable - - -/** - * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of - * type `OUT`. - */ -abstract class AccumulatorV2[IN, OUT] extends Serializable { - private[spark] var metadata: AccumulatorMetadata = _ - private[this] var atDriverSide = true - - private[spark] def register( - sc: SparkContext, - name: Option[String] = None, - countFailedValues: Boolean = false): Unit = { - if (this.metadata != null) { - throw new IllegalStateException("Cannot register an Accumulator twice.") - } - this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) - AccumulatorContext.register(this) - sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) - } - - /** - * Returns true if this accumulator has been registered. Note that all accumulators must be - * registered before ues, or it will throw exception. - */ - final def isRegistered: Boolean = - metadata != null && AccumulatorContext.get(metadata.id).isDefined - - private def assertMetadataNotNull(): Unit = { - if (metadata == null) { - throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.") - } - } - - /** - * Returns the id of this accumulator, can only be called after registration. - */ - final def id: Long = { - assertMetadataNotNull() - metadata.id - } - - /** - * Returns the name of this accumulator, can only be called after registration. - */ - final def name: Option[String] = { - assertMetadataNotNull() - metadata.name - } - - /** - * Whether to accumulate values from failed tasks. This is set to true for system and time - * metrics like serialization time or bytes spilled, and false for things with absolute values - * like number of input rows. This should be used for internal metrics only. - */ - private[spark] final def countFailedValues: Boolean = { - assertMetadataNotNull() - metadata.countFailedValues - } - - /** - * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided - * values. - */ - private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { - val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) - new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) - } - - final private[spark] def isAtDriverSide: Boolean = atDriverSide - - /** - * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero - * value; for a list accumulator, Nil is zero value. - */ - def isZero: Boolean - - /** - * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy - * must return true. - */ - def copyAndReset(): AccumulatorV2[IN, OUT] - - /** - * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. - */ - def add(v: IN): Unit - - /** - * Merges another same-type accumulator into this one and update its state, i.e. this should be - * merge-in-place. - */ - def merge(other: AccumulatorV2[IN, OUT]): Unit - - /** - * Access this accumulator's current value; only allowed on driver. - */ - final def value: OUT = { - if (atDriverSide) { - localValue - } else { - throw new UnsupportedOperationException("Can't read accumulator value in task") - } - } - - /** - * Defines the current value of this accumulator. - * - * This is NOT the global value of the accumulator. To get the global value after a - * completed operation on the dataset, call `value`. - */ - def localValue: OUT - - // Called by Java when serializing an object - final protected def writeReplace(): Any = { - if (atDriverSide) { - if (!isRegistered) { - throw new UnsupportedOperationException( - "Accumulator must be registered before send to executor") - } - val copy = copyAndReset() - assert(copy.isZero, "copyAndReset must return a zero value copy") - copy.metadata = metadata - copy - } else { - this - } - } - - // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { - in.defaultReadObject() - if (atDriverSide) { - atDriverSide = false - - // Automatically register the accumulator when it is deserialized with the task closure. - // This is for external accumulators and internal ones that do not represent task level - // metrics, e.g. internal SQL metrics, which are per-operator. - val taskContext = TaskContext.get() - if (taskContext != null) { - taskContext.registerAccumulator(this) - } - } else { - atDriverSide = true - } - } - - override def toString: String = { - if (metadata == null) { - "Un-registered Accumulator: " + getClass.getSimpleName - } else { - getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" - } - } -} - - -/** - * An internal class used to track accumulators by Spark itself. - */ -private[spark] object AccumulatorContext { - - /** - * This global map holds the original accumulator objects that are created on the driver. - * It keeps weak references to these objects so that accumulators can be garbage-collected - * once the RDDs and user-code that reference them are cleaned up. - * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). - */ - private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]] - - private[this] val nextId = new AtomicLong(0L) - - /** - * Returns a globally unique ID for a new [[Accumulator]]. - * Note: Once you copy the [[Accumulator]] the ID is no longer unique. - */ - def newId(): Long = nextId.getAndIncrement - - /** Returns the number of accumulators registered. Used in testing. */ - def numAccums: Int = originals.size - - /** - * Registers an [[Accumulator]] created on the driver such that it can be used on the executors. - * - * All accumulators registered here can later be used as a container for accumulating partial - * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. - * Note: if an accumulator is registered here, it should also be registered with the active - * context cleaner for cleanup so as to avoid memory leaks. - * - * If an [[Accumulator]] with the same ID was already registered, this does nothing instead - * of overwriting it. We will never register same accumulator twice, this is just a sanity check. - */ - def register(a: AccumulatorV2[_, _]): Unit = { - originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a)) - } - - /** - * Unregisters the [[Accumulator]] with the given ID, if any. - */ - def remove(id: Long): Unit = { - originals.remove(id) - } - - /** - * Returns the [[Accumulator]] registered with the given ID, if any. - */ - def get(id: Long): Option[AccumulatorV2[_, _]] = { - Option(originals.get(id)).map { ref => - // Since we are storing weak references, we must check whether the underlying data is valid. - val acc = ref.get - if (acc eq null) { - throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") - } - acc - } - } - - /** - * Clears all registered [[Accumulator]]s. For testing only. - */ - def clear(): Unit = { - originals.clear() - } -} - - -/** - * An [[AccumulatorV2 accumulator]] for computing sum, count, and averages for 64-bit integers. - * - * @since 2.0.0 - */ -class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { - private[this] var _sum = 0L - private[this] var _count = 0L - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - override def isZero: Boolean = _count == 0L - - override def copyAndReset(): LongAccumulator = new LongAccumulator - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - override def add(v: jl.Long): Unit = { - _sum += v - _count += 1 - } - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - def add(v: Long): Unit = { - _sum += v - _count += 1 - } - - /** - * Returns the number of elements added to the accumulator. - * @since 2.0.0 - */ - def count: Long = _count - - /** - * Returns the sum of elements added to the accumulator. - * @since 2.0.0 - */ - def sum: Long = _sum - - /** - * Returns the average of elements added to the accumulator. - * @since 2.0.0 - */ - def avg: Double = _sum.toDouble / _count - - override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match { - case o: LongAccumulator => - _sum += o.sum - _count += o.count - case _ => - throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - private[spark] def setValue(newValue: Long): Unit = _sum = newValue - - override def localValue: jl.Long = _sum -} - - -/** - * An [[AccumulatorV2 accumulator]] for computing sum, count, and averages for double precision - * floating numbers. - * - * @since 2.0.0 - */ -class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { - private[this] var _sum = 0.0 - private[this] var _count = 0L - - override def isZero: Boolean = _count == 0L - - override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - override def add(v: jl.Double): Unit = { - _sum += v - _count += 1 - } - - /** - * Adds v to the accumulator, i.e. increment sum by v and count by 1. - * @since 2.0.0 - */ - def add(v: Double): Unit = { - _sum += v - _count += 1 - } - - /** - * Returns the number of elements added to the accumulator. - * @since 2.0.0 - */ - def count: Long = _count - - /** - * Returns the sum of elements added to the accumulator. - * @since 2.0.0 - */ - def sum: Double = _sum - - /** - * Returns the average of elements added to the accumulator. - * @since 2.0.0 - */ - def avg: Double = _sum / _count - - override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { - case o: DoubleAccumulator => - _sum += o.sum - _count += o.count - case _ => - throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - private[spark] def setValue(newValue: Double): Unit = _sum = newValue - - override def localValue: jl.Double = _sum -} - - -class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { - private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] - - override def isZero: Boolean = _list.isEmpty - - override def copyAndReset(): ListAccumulator[T] = new ListAccumulator - - override def add(v: T): Unit = _list.add(v) - - override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { - case o: ListAccumulator[T] => _list.addAll(o.localValue) - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) - - private[spark] def setValue(newValue: java.util.List[T]): Unit = { - _list.clear() - _list.addAll(newValue) - } -} - - -class LegacyAccumulatorWrapper[R, T]( - initialValue: R, - param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { - private[spark] var _value = initialValue // Current value on driver - - override def isZero: Boolean = _value == param.zero(initialValue) - - override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { - val acc = new LegacyAccumulatorWrapper(initialValue, param) - acc._value = param.zero(initialValue) - acc - } - - override def add(v: T): Unit = _value = param.addAccumulator(_value, v) - - override def merge(other: AccumulatorV2[T, R]): Unit = other match { - case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) - case _ => throw new UnsupportedOperationException( - s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") - } - - override def localValue: R = _value -} diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index c895fb3206..5678d790e9 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils} /** * Classes that represent cleaning tasks. diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 29018c75b9..73495a8d7d 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} +import org.apache.spark.util._ /** * A heartbeat from executors to the driver. This is a shared message used by several internal diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 1a8f8cf11c..27abccf5ac 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -24,7 +24,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source -import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener} +import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, TaskFailureListener} object TaskContext { diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index ef333e397f..42690844f9 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.Utils +import org.apache.spark.util.{AccumulatorV2, Utils} // ============================================================================================== // NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol! diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala index 6f7160ac0d..3d15f3a039 100644 --- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -17,8 +17,8 @@ package org.apache.spark.executor -import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.LongAccumulator /** diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala index db3924cb69..dada9697c1 100644 --- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -17,8 +17,8 @@ package org.apache.spark.executor -import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.LongAccumulator /** diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index fa962108c3..f7a991770d 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -17,8 +17,8 @@ package org.apache.spark.executor -import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.LongAccumulator /** diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala index 0e70a4f522..ada2e1bc08 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -17,8 +17,8 @@ package org.apache.spark.executor -import org.apache.spark.LongAccumulator import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.LongAccumulator /** diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 56d034fd03..085aa7fbd6 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 0a2c2dc039..8c76112482 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -23,7 +23,7 @@ import scala.language.existentials import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.util.CallSite +import org.apache.spark.util.{AccumulatorV2, CallSite} /** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 362f8e51ce..95bcc7bc96 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -28,7 +28,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} +import org.apache.spark.util.{AccumulatorV2, ByteBufferInputStream, ByteBufferOutputStream, Utils} /** * A unit of execution. We have two kinds of Task's in Spark: diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 69ce00f30d..80f2bf4122 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -22,9 +22,9 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{AccumulatorV2, SparkEnv} +import org.apache.spark.SparkEnv import org.apache.spark.storage.BlockId -import org.apache.spark.util.Utils +import org.apache.spark.util.{AccumulatorV2, Utils} // Task result. Also contains updates to accumulator variables. private[spark] sealed trait TaskResult[T] diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index b438c285fd..685ef55c66 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -27,7 +27,7 @@ import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.Logging import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{LongAccumulator, ThreadUtils, Utils} /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 9881a1018c..cd13eebe74 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,9 +17,9 @@ package org.apache.spark.scheduler -import org.apache.spark.AccumulatorV2 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.AccumulatorV2 /** * Low-level task scheduler interface, currently implemented exclusively by diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 666b636558..393680f4c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils} /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index b724050f5b..cd634bbf6f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -32,7 +32,7 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.TaskState.TaskState -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils} /** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala new file mode 100644 index 0000000000..0e280f6f6a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.{lang => jl} +import java.io.ObjectInputStream +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.{InternalAccumulator, SparkContext, TaskContext} +import org.apache.spark.scheduler.AccumulableInfo + + +private[spark] case class AccumulatorMetadata( + id: Long, + name: Option[String], + countFailedValues: Boolean) extends Serializable + + +/** + * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of + * type `OUT`. + */ +abstract class AccumulatorV2[IN, OUT] extends Serializable { + private[spark] var metadata: AccumulatorMetadata = _ + private[this] var atDriverSide = true + + private[spark] def register( + sc: SparkContext, + name: Option[String] = None, + countFailedValues: Boolean = false): Unit = { + if (this.metadata != null) { + throw new IllegalStateException("Cannot register an Accumulator twice.") + } + this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) + AccumulatorContext.register(this) + sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) + } + + /** + * Returns true if this accumulator has been registered. Note that all accumulators must be + * registered before ues, or it will throw exception. + */ + final def isRegistered: Boolean = + metadata != null && AccumulatorContext.get(metadata.id).isDefined + + private def assertMetadataNotNull(): Unit = { + if (metadata == null) { + throw new IllegalAccessError("The metadata of this accumulator has not been assigned yet.") + } + } + + /** + * Returns the id of this accumulator, can only be called after registration. + */ + final def id: Long = { + assertMetadataNotNull() + metadata.id + } + + /** + * Returns the name of this accumulator, can only be called after registration. + */ + final def name: Option[String] = { + assertMetadataNotNull() + metadata.name + } + + /** + * Whether to accumulate values from failed tasks. This is set to true for system and time + * metrics like serialization time or bytes spilled, and false for things with absolute values + * like number of input rows. This should be used for internal metrics only. + */ + private[spark] final def countFailedValues: Boolean = { + assertMetadataNotNull() + metadata.countFailedValues + } + + /** + * Creates an [[AccumulableInfo]] representation of this [[AccumulatorV2]] with the provided + * values. + */ + private[spark] def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + val isInternal = name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) + new AccumulableInfo(id, name, update, value, isInternal, countFailedValues) + } + + final private[spark] def isAtDriverSide: Boolean = atDriverSide + + /** + * Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero + * value; for a list accumulator, Nil is zero value. + */ + def isZero: Boolean + + /** + * Creates a new copy of this accumulator, which is zero value. i.e. call `isZero` on the copy + * must return true. + */ + def copyAndReset(): AccumulatorV2[IN, OUT] + + /** + * Takes the inputs and accumulates. e.g. it can be a simple `+=` for counter accumulator. + */ + def add(v: IN): Unit + + /** + * Merges another same-type accumulator into this one and update its state, i.e. this should be + * merge-in-place. + */ + def merge(other: AccumulatorV2[IN, OUT]): Unit + + /** + * Access this accumulator's current value; only allowed on driver. + */ + final def value: OUT = { + if (atDriverSide) { + localValue + } else { + throw new UnsupportedOperationException("Can't read accumulator value in task") + } + } + + /** + * Defines the current value of this accumulator. + * + * This is NOT the global value of the accumulator. To get the global value after a + * completed operation on the dataset, call `value`. + */ + def localValue: OUT + + // Called by Java when serializing an object + final protected def writeReplace(): Any = { + if (atDriverSide) { + if (!isRegistered) { + throw new UnsupportedOperationException( + "Accumulator must be registered before send to executor") + } + val copy = copyAndReset() + assert(copy.isZero, "copyAndReset must return a zero value copy") + copy.metadata = metadata + copy + } else { + this + } + } + + // Called by Java when deserializing an object + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + if (atDriverSide) { + atDriverSide = false + + // Automatically register the accumulator when it is deserialized with the task closure. + // This is for external accumulators and internal ones that do not represent task level + // metrics, e.g. internal SQL metrics, which are per-operator. + val taskContext = TaskContext.get() + if (taskContext != null) { + taskContext.registerAccumulator(this) + } + } else { + atDriverSide = true + } + } + + override def toString: String = { + if (metadata == null) { + "Un-registered Accumulator: " + getClass.getSimpleName + } else { + getClass.getSimpleName + s"(id: $id, name: $name, value: $localValue)" + } + } +} + + +/** + * An internal class used to track accumulators by Spark itself. + */ +private[spark] object AccumulatorContext { + + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + * TODO: Don't use a global map; these should be tied to a SparkContext (SPARK-13051). + */ + private val originals = new ConcurrentHashMap[Long, jl.ref.WeakReference[AccumulatorV2[_, _]]] + + private[this] val nextId = new AtomicLong(0L) + + /** + * Returns a globally unique ID for a new [[AccumulatorV2]]. + * Note: Once you copy the [[AccumulatorV2]] the ID is no longer unique. + */ + def newId(): Long = nextId.getAndIncrement + + /** Returns the number of accumulators registered. Used in testing. */ + def numAccums: Int = originals.size + + /** + * Registers an [[AccumulatorV2]] created on the driver such that it can be used on the executors. + * + * All accumulators registered here can later be used as a container for accumulating partial + * values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. + * Note: if an accumulator is registered here, it should also be registered with the active + * context cleaner for cleanup so as to avoid memory leaks. + * + * If an [[AccumulatorV2]] with the same ID was already registered, this does nothing instead + * of overwriting it. We will never register same accumulator twice, this is just a sanity check. + */ + def register(a: AccumulatorV2[_, _]): Unit = { + originals.putIfAbsent(a.id, new jl.ref.WeakReference[AccumulatorV2[_, _]](a)) + } + + /** + * Unregisters the [[AccumulatorV2]] with the given ID, if any. + */ + def remove(id: Long): Unit = { + originals.remove(id) + } + + /** + * Returns the [[AccumulatorV2]] registered with the given ID, if any. + */ + def get(id: Long): Option[AccumulatorV2[_, _]] = { + Option(originals.get(id)).map { ref => + // Since we are storing weak references, we must check whether the underlying data is valid. + val acc = ref.get + if (acc eq null) { + throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") + } + acc + } + } + + /** + * Clears all registered [[AccumulatorV2]]s. For testing only. + */ + def clear(): Unit = { + originals.clear() + } +} + + +/** + * An [[AccumulatorV2 accumulator]] for computing sum, count, and averages for 64-bit integers. + * + * @since 2.0.0 + */ +class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] { + private[this] var _sum = 0L + private[this] var _count = 0L + + /** + * Adds v to the accumulator, i.e. increment sum by v and count by 1. + * @since 2.0.0 + */ + override def isZero: Boolean = _count == 0L + + override def copyAndReset(): LongAccumulator = new LongAccumulator + + /** + * Adds v to the accumulator, i.e. increment sum by v and count by 1. + * @since 2.0.0 + */ + override def add(v: jl.Long): Unit = { + _sum += v + _count += 1 + } + + /** + * Adds v to the accumulator, i.e. increment sum by v and count by 1. + * @since 2.0.0 + */ + def add(v: Long): Unit = { + _sum += v + _count += 1 + } + + /** + * Returns the number of elements added to the accumulator. + * @since 2.0.0 + */ + def count: Long = _count + + /** + * Returns the sum of elements added to the accumulator. + * @since 2.0.0 + */ + def sum: Long = _sum + + /** + * Returns the average of elements added to the accumulator. + * @since 2.0.0 + */ + def avg: Double = _sum.toDouble / _count + + override def merge(other: AccumulatorV2[jl.Long, jl.Long]): Unit = other match { + case o: LongAccumulator => + _sum += o.sum + _count += o.count + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Long): Unit = _sum = newValue + + override def localValue: jl.Long = _sum +} + + +/** + * An [[AccumulatorV2 accumulator]] for computing sum, count, and averages for double precision + * floating numbers. + * + * @since 2.0.0 + */ +class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { + private[this] var _sum = 0.0 + private[this] var _count = 0L + + override def isZero: Boolean = _count == 0L + + override def copyAndReset(): DoubleAccumulator = new DoubleAccumulator + + /** + * Adds v to the accumulator, i.e. increment sum by v and count by 1. + * @since 2.0.0 + */ + override def add(v: jl.Double): Unit = { + _sum += v + _count += 1 + } + + /** + * Adds v to the accumulator, i.e. increment sum by v and count by 1. + * @since 2.0.0 + */ + def add(v: Double): Unit = { + _sum += v + _count += 1 + } + + /** + * Returns the number of elements added to the accumulator. + * @since 2.0.0 + */ + def count: Long = _count + + /** + * Returns the sum of elements added to the accumulator. + * @since 2.0.0 + */ + def sum: Double = _sum + + /** + * Returns the average of elements added to the accumulator. + * @since 2.0.0 + */ + def avg: Double = _sum / _count + + override def merge(other: AccumulatorV2[jl.Double, jl.Double]): Unit = other match { + case o: DoubleAccumulator => + _sum += o.sum + _count += o.count + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + private[spark] def setValue(newValue: Double): Unit = _sum = newValue + + override def localValue: jl.Double = _sum +} + + +class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { + private[this] val _list: java.util.List[T] = new java.util.ArrayList[T] + + override def isZero: Boolean = _list.isEmpty + + override def copyAndReset(): ListAccumulator[T] = new ListAccumulator + + override def add(v: T): Unit = _list.add(v) + + override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match { + case o: ListAccumulator[T] => _list.addAll(o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + + private[spark] def setValue(newValue: java.util.List[T]): Unit = { + _list.clear() + _list.addAll(newValue) + } +} + + +class LegacyAccumulatorWrapper[R, T]( + initialValue: R, + param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { + private[spark] var _value = initialValue // Current value on driver + + override def isZero: Boolean = _value == param.zero(initialValue) + + override def copyAndReset(): LegacyAccumulatorWrapper[R, T] = { + val acc = new LegacyAccumulatorWrapper(initialValue, param) + acc._value = param.zero(initialValue) + acc + } + + override def add(v: T): Unit = _value = param.addAccumulator(_value, v) + + override def merge(other: AccumulatorV2[T, R]): Unit = other match { + case o: LegacyAccumulatorWrapper[R, T] => _value = param.addInPlace(_value, o.localValue) + case _ => throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + + override def localValue: R = _value +} diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 0020096254..cade67b1d2 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark.AccumulatorParam.StringAccumulatorParam import org.apache.spark.scheduler._ import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{AccumulatorContext, AccumulatorMetadata, AccumulatorV2, LongAccumulator} class AccumulatorSuite extends SparkFunSuite with Matchers with LocalSparkContext { diff --git a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala index 25977a4660..840f55ce2f 100644 --- a/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/InternalAccumulatorSuite.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext { diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala index 4aae2c9b4a..0081bca639 100644 --- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala @@ -21,6 +21,7 @@ package org.apache.spark import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome} import org.apache.spark.internal.Logging +import org.apache.spark.util.AccumulatorContext /** * Base abstract class for all unit tests in Spark for handling common functionality. diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 27a1e7bb35..eae26fa742 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.storage.{BlockStatus, StorageLevel, TestBlockId} +import org.apache.spark.util.AccumulatorV2 class TaskMetricsSuite extends SparkFunSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5a5c3a0cd1..844c780a3f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -28,11 +28,10 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ -import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} -import org.apache.spark.util.{CallSite, Utils} +import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler) extends DAGSchedulerEventProcessLoop(dagScheduler) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 72ac848f12..59c1b359a7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.scheduler -import org.apache.spark.{AccumulatorV2, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.AccumulatorV2 class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext { test("launch of backend and scheduler") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 122a3ecb49..9b7b945bf3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{AccumulatorV2, ManualClock} class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) extends DAGScheduler(sc) { diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index 41cdd02492..815b134884 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import org.apache.spark.{DoubleAccumulator, LongAccumulator, SparkFunSuite} +import org.apache.spark.SparkFunSuite class AccumulatorV2Suite extends SparkFunSuite { -- cgit v1.2.3