diff options
author | Matei Zaharia <matei@databricks.com> | 2014-01-02 15:54:54 -0500 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-01-02 15:54:54 -0500 |
commit | ca67909cd4b4accea64498b3bc1421c6d75e33a6 (patch) | |
tree | b6e13c3c0be95ea99c857429c97d3687c61050a3 /core/src/main | |
parent | 3713f8129a618a633a7aca8c944960c3e7ac9d3b (diff) | |
parent | fec01664a717c8ecf84eaf7a2523a62cd5d3b4b8 (diff) | |
download | spark-ca67909cd4b4accea64498b3bc1421c6d75e33a6.tar.gz spark-ca67909cd4b4accea64498b3bc1421c6d75e33a6.tar.bz2 spark-ca67909cd4b4accea64498b3bc1421c6d75e33a6.zip |
Merge pull request #311 from tmyklebu/master
SPARK-991: Report information gleaned from a Python stacktrace in the UI
Scala:
- Added setCallSite/clearCallSite to SparkContext and JavaSparkContext.
These functions mutate a LocalProperty called "externalCallSite."
- Add a wrapper, getCallSite, that checks for an externalCallSite and, if
none is found, calls the usual Utils.formatSparkCallSite.
- Change everything that calls Utils.formatSparkCallSite to call
getCallSite instead. Except getCallSite.
- Add wrappers to setCallSite/clearCallSite wrappers to JavaSparkContext.
Python:
- Add a gruesome hack to rdd.py that inspects the traceback and guesses
what you want to see in the UI.
- Add a RAII wrapper around said gruesome hack that calls
setCallSite/clearCallSite as appropriate.
- Wire said RAII wrapper up around three calls into the Scala code.
I'm not sure that I hit all the spots with the RAII wrapper. I'm also
not sure that my gruesome hack does exactly what we want.
One could also approach this change by refactoring
runJob/submitJob/runApproximateJob to take a call site, then threading
that parameter through everything that needs to know it.
One might object to the pointless-looking wrappers in JavaSparkContext.
Unfortunately, I can't directly access the SparkContext from
Python---or, if I can, I don't know how---so I need to wrap everything
that matters in JavaSparkContext.
Conflicts:
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
Diffstat (limited to 'core/src/main')
3 files changed, 38 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 84bd0f7ffd..4d6a97e255 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -729,6 +729,26 @@ class SparkContext( } /** + * Support function for API backtraces. + */ + def setCallSite(site: String) { + setLocalProperty("externalCallSite", site) + } + + /** + * Support function for API backtraces. + */ + def clearCallSite() { + setLocalProperty("externalCallSite", null) + } + + private[spark] def getCallSite(): String = { + val callSite = getLocalProperty("externalCallSite") + if (callSite == null) return Utils.formatSparkCallSite + callSite + } + + /** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than @@ -740,7 +760,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) val start = System.nanoTime @@ -824,7 +844,7 @@ class SparkContext( func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, @@ -844,7 +864,7 @@ class SparkContext( resultFunc: => R): SimpleFutureAction[R] = { val cleanF = clean(processPartition) - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 0680a065a3..5be5317f40 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -411,6 +411,20 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * changed at runtime. */ def getConf: SparkConf = sc.getConf + + /** + * Pass-through to SparkContext.setCallSite. For API support only. + */ + def setCallSite(site: String) { + sc.setCallSite(site) + } + + /** + * Pass-through to SparkContext.setCallSite. For API support only. + */ + def clearCallSite() { + sc.clearCallSite() + } } object JavaSparkContext { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6a7b0f8a86..3f41b66279 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -953,7 +953,7 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ - @transient private[spark] val origin = Utils.formatSparkCallSite + @transient private[spark] val origin = sc.getCallSite private[spark] def elementClassTag: ClassTag[T] = classTag[T] |