aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMubarak Seyed <mubarak.seyed@gmail.com>2014-09-23 15:09:12 -0700
committerAndrew Or <andrewor14@gmail.com>2014-09-23 15:09:12 -0700
commit729952a5efce755387c76cdf29280ee6f49fdb72 (patch)
tree034ef81a4f69c3329a93b213ebc2092e6046f035 /core
parentb3fef50e22fb3fe499f627179d17836a92dcb33a (diff)
downloadspark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.gz
spark-729952a5efce755387c76cdf29280ee6f49fdb72.tar.bz2
spark-729952a5efce755387c76cdf29280ee6f49fdb72.zip
[SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI
This is a refactored version of the original PR https://github.com/apache/spark/pull/1723 my mubarak Please take a look andrewor14, mubarak Author: Mubarak Seyed <mubarak.seyed@gmail.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #2464 from tdas/streaming-callsite and squashes the following commits: dc54c71 [Tathagata Das] Made changes based on PR comments. 390b45d [Tathagata Das] Fixed minor bugs. 904cd92 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-callsite 7baa427 [Tathagata Das] Refactored getCallSite and setCallSite to make it simpler. Also added unit test for DStream creation site. b9ed945 [Mubarak Seyed] Adding streaming utils c461cf4 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' ceb43da [Mubarak Seyed] Changing default regex function name 8c5d443 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 196121b [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 491a1eb [Mubarak Seyed] Removing streaming visibility from getRDDCreationCallSite in DStream 33a7295 [Mubarak Seyed] Fixing review comments: Merging both setCallSite methods c26d933 [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' f51fd9f [Mubarak Seyed] Fixing scalastyle, Regex for Utils.getCallSite, and changing method names in DStream 5051c58 [Mubarak Seyed] Getting return value of compute() into variable and call setCallSite(prevCallSite) only once. Adding return for other code paths (for None) a207eb7 [Mubarak Seyed] Fixing code review comments ccde038 [Mubarak Seyed] Removing Utils import from MappedDStream 2a09ad6 [Mubarak Seyed] Changes in Utils.scala for SPARK-1853 1d90cc3 [Mubarak Seyed] Changes for SPARK-1853 5f3105a [Mubarak Seyed] Merge remote-tracking branch 'upstream/master' 70f494f [Mubarak Seyed] Changes for SPARK-1853 1500deb [Mubarak Seyed] Changes in Spark Streaming UI 9d38d3c [Mubarak Seyed] [SPARK-1853] Show Streaming application code context (file, line number) in Spark Stages UI d466d75 [Mubarak Seyed] Changes for spark streaming UI
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala27
3 files changed, 46 insertions, 20 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 428f019b02..979d178c35 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1030,28 +1030,40 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Support function for API backtraces.
+ * Set the thread-local property for overriding the call sites
+ * of actions and RDDs.
*/
- def setCallSite(site: String) {
- setLocalProperty("externalCallSite", site)
+ def setCallSite(shortCallSite: String) {
+ setLocalProperty(CallSite.SHORT_FORM, shortCallSite)
}
/**
- * Support function for API backtraces.
+ * Set the thread-local property for overriding the call sites
+ * of actions and RDDs.
+ */
+ private[spark] def setCallSite(callSite: CallSite) {
+ setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm)
+ setLocalProperty(CallSite.LONG_FORM, callSite.longForm)
+ }
+
+ /**
+ * Clear the thread-local property for overriding the call sites
+ * of actions and RDDs.
*/
def clearCallSite() {
- setLocalProperty("externalCallSite", null)
+ setLocalProperty(CallSite.SHORT_FORM, null)
+ setLocalProperty(CallSite.LONG_FORM, null)
}
/**
* Capture the current user callsite and return a formatted version for printing. If the user
- * has overridden the call site, this will return the user's version.
+ * has overridden the call site using `setCallSite()`, this will return the user's version.
*/
private[spark] def getCallSite(): CallSite = {
- Option(getLocalProperty("externalCallSite")) match {
- case Some(callSite) => CallSite(callSite, longForm = "")
- case None => Utils.getCallSite
- }
+ Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
+ val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
+ CallSite(shortCallSite, longCallSite)
+ }.getOrElse(Utils.getCallSite())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a9b905b0d1..0e90caa5c9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.util.Random
+import java.util.{Properties, Random}
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
@@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
@@ -1224,7 +1224,8 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** User code that created this RDD (e.g. `textFile`, `parallelize`). */
- @transient private[spark] val creationSite = Utils.getCallSite
+ @transient private[spark] val creationSite = sc.getCallSite()
+
private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("")
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ed06384432..2755887fee 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -49,6 +49,11 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream,
/** CallSite represents a place in user code. It can have a short and a long form. */
private[spark] case class CallSite(shortForm: String, longForm: String)
+private[spark] object CallSite {
+ val SHORT_FORM = "callSite.short"
+ val LONG_FORM = "callSite.long"
+}
+
/**
* Various utility methods used by Spark.
*/
@@ -859,18 +864,26 @@ private[spark] object Utils extends Logging {
}
}
- /**
- * A regular expression to match classes of the "core" Spark API that we want to skip when
- * finding the call site of a method.
- */
- private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+ /** Default filtering function for finding call sites using `getCallSite`. */
+ private def coreExclusionFunction(className: String): Boolean = {
+ // A regular expression to match classes of the "core" Spark API that we want to skip when
+ // finding the call site of a method.
+ val SPARK_CORE_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
+ val SCALA_CLASS_REGEX = """^scala""".r
+ val isSparkCoreClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined
+ val isScalaClass = SCALA_CLASS_REGEX.findFirstIn(className).isDefined
+ // If the class is a Spark internal class or a Scala class, then exclude.
+ isSparkCoreClass || isScalaClass
+ }
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
+ *
+ * @param skipClass Function that is used to exclude non-user-code classes.
*/
- def getCallSite: CallSite = {
+ def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
@@ -891,7 +904,7 @@ private[spark] object Utils extends Logging {
for (el <- trace) {
if (insideSpark) {
- if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
+ if (skipClass(el.getClassName)) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)