aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
3 files changed, 38 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ad3337d94c..3ab27feffe 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -704,6 +704,26 @@ class SparkContext(
}
/**
+ * Support function for API backtraces.
+ */
+ def setCallSite(site: String) {
+ setLocalProperty("externalCallSite", site)
+ }
+
+ /**
+ * Support function for API backtraces.
+ */
+ def clearCallSite() {
+ setLocalProperty("externalCallSite", null)
+ }
+
+ private[spark] def getCallSite(): String = {
+ val callSite = getLocalProperty("externalCallSite")
+ if (callSite == null) return Utils.formatSparkCallSite
+ callSite
+ }
+
+ /**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
@@ -715,7 +735,7 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
val start = System.nanoTime
@@ -799,7 +819,7 @@ class SparkContext(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
@@ -819,7 +839,7 @@ class SparkContext(
resultFunc: => R): SimpleFutureAction[R] =
{
val cleanF = clean(processPartition)
- val callSite = Utils.formatSparkCallSite
+ val callSite = getCallSite
val waiter = dagScheduler.submitJob(
rdd,
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index acf328aa6a..3003e7afb5 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -405,6 +405,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
+
+ /**
+ * Pass-through to SparkContext.setCallSite. For API support only.
+ */
+ def setCallSite(site: String) {
+ sc.setCallSite(site)
+ }
+
+ /**
+ * Pass-through to SparkContext.setCallSite. For API support only.
+ */
+ def clearCallSite() {
+ sc.clearCallSite()
+ }
}
object JavaSparkContext {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566ad1..311ee4d510 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -938,7 +938,7 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Record user function generating this RDD. */
- @transient private[spark] val origin = Utils.formatSparkCallSite
+ @transient private[spark] val origin = sc.getCallSite
private[spark] def elementClassTag: ClassTag[T] = classTag[T]