diff options
Diffstat (limited to 'core/src/main/scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 32 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 27 |
3 files changed, 46 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 428f019b02..979d178c35 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Support function for API backtraces. + * Set the thread-local property for overriding the call sites + * of actions and RDDs. */ - def setCallSite(site: String) { - setLocalProperty("externalCallSite", site) + def setCallSite(shortCallSite: String) { + setLocalProperty(CallSite.SHORT_FORM, shortCallSite) } /** - * Support function for API backtraces. + * Set the thread-local property for overriding the call sites + * of actions and RDDs. + */ + private[spark] def setCallSite(callSite: CallSite) { + setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm) + setLocalProperty(CallSite.LONG_FORM, callSite.longForm) + } + + /** + * Clear the thread-local property for overriding the call sites + * of actions and RDDs. */ def clearCallSite() { - setLocalProperty("externalCallSite", null) + setLocalProperty(CallSite.SHORT_FORM, null) + setLocalProperty(CallSite.LONG_FORM, null) } /** * Capture the current user callsite and return a formatted version for printing. If the user - * has overridden the call site, this will return the user's version. + * has overridden the call site using `setCallSite()`, this will return the user's version. */ private[spark] def getCallSite(): CallSite = { - Option(getLocalProperty("externalCallSite")) match { - case Some(callSite) => CallSite(callSite, longForm = "") - case None => Utils.getCallSite - } + Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite => + val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("") + CallSite(shortCallSite, longCallSite) + }.getOrElse(Utils.getCallSite()) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a9b905b0d1..0e90caa5c9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.util.Random +import java.util.{Properties, Random} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val creationSite = Utils.getCallSite + @transient private[spark] val creationSite = sc.getCallSite() + private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") private[spark] def elementClassTag: ClassTag[T] = classTag[T] diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ed06384432..2755887fee 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, /** CallSite represents a place in user code. It can have a short and a long form. */ private[spark] case class CallSite(shortForm: String, longForm: String) +private[spark] object CallSite { + val SHORT_FORM = "callSite.short" + val LONG_FORM = "callSite.long" +} + /** * Various utility methods used by Spark. */ @@ -859,18 +864,26 @@ private[spark] object Utils extends Logging { } } - /** - * A regular expression to match classes of the "core" Spark API that we want to skip when - * finding the call site of a method. - */ - private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r + /** Default filtering function for finding call sites using `getCallSite`. */ + private def coreExclusionFunction(className: String): Boolean = { + // A regular expression to match classes of the "core" Spark API that we want to skip when + // finding the call site of a method. + val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r + val SCALA_CLASS_REGEX = """^scala""".r + val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined + val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined + // If the class is a Spark internal class or a Scala class, then exclude. + isSparkCoreClass || isScalaClass + } /** * When called inside a class in the spark package, returns the name of the user code class * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. + * + * @param skipClass Function that is used to exclude non-user-code classes. */ - def getCallSite: CallSite = { + def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = { val trace = Thread.currentThread.getStackTrace() .filterNot { ste:StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus @@ -891,7 +904,7 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (insideSpark) { - if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { + if (skipClass(el.getClassName)) { lastSparkMethod = if (el.getMethodName == "<init>") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) |