diff options
author | Mubarak Seyed <mubarak.seyed@gmail.com> | 2014-09-23 15:09:12 -0700 |
---|---|---|
committer | Andrew Or <andrewor14@gmail.com> | 2014-09-23 15:09:12 -0700 |
commit | 729952a5efce755387c76cdf29280ee6f49fdb72 (patch) | |
tree | 034ef81a4f69c3329a93b213ebc2092e6046f035 /core | |
parent | b3fef50e22fb3fe499f627179d17836a92dcb33a (diff) | |
download | spark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.gz spark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.bz2 spark-729952a5efce755387c76cdf29280ee6f49fdb72.zip |
[SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
This is a refactored version of the original PR https://github.com/apache/spark/pull/1723 my mubarak
Please take a look andrewor14, mubarak
Author: Mubarak Seyed <mubarak.seyed@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #2464 from tdas/streaming-callsite and squashes the following commits:
dc54c71 [Tathagata Das] Made changes based on PR comments.
390b45d [Tathagata Das] Fixed minor bugs.
904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite
7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site.
b9ed945 [Mubarak Seyed] Adding streaming utils
c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
ceb43da [Mubarak Seyed] Changing default regex function name
8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream
33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods
c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream
5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None)
a207eb7 [Mubarak Seyed] Fixing code review comments
ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream
2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853
1d90cc3 [Mubarak Seyed] Changes for SPARK-1853
5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master'
70f494f [Mubarak Seyed] Changes for SPARK-1853
1500deb [Mubarak Seyed] Changes in Spark Streaming UI
9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
d466d75 [Mubarak Seyed] Changes for spark streaming UI
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 32 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 27 |
3 files changed, 46 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 428f019b02..979d178c35 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Support function for API backtraces. + * Set the thread-local property for overriding the call sites + * of actions and RDDs. */ - def setCallSite(site: String) { - setLocalProperty("externalCallSite", site) + def setCallSite(shortCallSite: String) { + setLocalProperty(CallSite.SHORT_FORM, shortCallSite) } /** - * Support function for API backtraces. + * Set the thread-local property for overriding the call sites + * of actions and RDDs. + */ + private[spark] def setCallSite(callSite: CallSite) { + setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm) + setLocalProperty(CallSite.LONG_FORM, callSite.longForm) + } + + /** + * Clear the thread-local property for overriding the call sites + * of actions and RDDs. */ def clearCallSite() { - setLocalProperty("externalCallSite", null) + setLocalProperty(CallSite.SHORT_FORM, null) + setLocalProperty(CallSite.LONG_FORM, null) } /** * Capture the current user callsite and return a formatted version for printing. If the user - * has overridden the call site, this will return the user's version. + * has overridden the call site using `setCallSite()`, this will return the user's version. */ private[spark] def getCallSite(): CallSite = { - Option(getLocalProperty("externalCallSite")) match { - case Some(callSite) => CallSite(callSite, longForm = "") - case None => Utils.getCallSite - } + Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite => + val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("") + CallSite(shortCallSite, longCallSite) + }.getOrElse(Utils.getCallSite()) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a9b905b0d1..0e90caa5c9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.rdd -import java.util.Random +import java.util.{Properties, Random} import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{BoundedPriorityQueue, Utils} +import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite} import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils} @@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ - @transient private[spark] val creationSite = Utils.getCallSite + @transient private[spark] val creationSite = sc.getCallSite() + private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") private[spark] def elementClassTag: ClassTag[T] = classTag[T] diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ed06384432..2755887fee 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, /** CallSite represents a place in user code. It can have a short and a long form. */ private[spark] case class CallSite(shortForm: String, longForm: String) +private[spark] object CallSite { + val SHORT_FORM = "callSite.short" + val LONG_FORM = "callSite.long" +} + /** * Various utility methods used by Spark. */ @@ -859,18 +864,26 @@ private[spark] object Utils extends Logging { } } - /** - * A regular expression to match classes of the "core" Spark API that we want to skip when - * finding the call site of a method. - */ - private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r + /** Default filtering function for finding call sites using `getCallSite`. */ + private def coreExclusionFunction(className: String): Boolean = { + // A regular expression to match classes of the "core" Spark API that we want to skip when + // finding the call site of a method. + val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r + val SCALA_CLASS_REGEX = """^scala""".r + val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined + val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined + // If the class is a Spark internal class or a Scala class, then exclude. + isSparkCoreClass || isScalaClass + } /** * When called inside a class in the spark package, returns the name of the user code class * (outside the spark package) that called into Spark, as well as which Spark method they called. * This is used, for example, to tell users where in their code each RDD got created. + * + * @param skipClass Function that is used to exclude non-user-code classes. */ - def getCallSite: CallSite = { + def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = { val trace = Thread.currentThread.getStackTrace() .filterNot { ste:StackTraceElement => // When running under some profilers, the current stack trace might contain some bogus @@ -891,7 +904,7 @@ private[spark] object Utils extends Logging { for (el <- trace) { if (insideSpark) { - if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) { + if (skipClass(el.getClassName)) { lastSparkMethod = if (el.getMethodName == "<init>") { // Spark method is a constructor; get its class name el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) |