aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala35
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala5
2 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b0d86ebbae..6883fb70f9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -61,6 +61,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def compute(split: Split): Iterator[T]
@transient val dependencies: List[Dependency[_]]
+ // Record user function generating this RDD
+ val origin = getOriginDescription
+
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
@@ -124,6 +127,38 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
}
+ // Describe which spark and user functions generated this RDD. Only works if called from
+ // constructor.
+ def getOriginDescription : String = {
+ val trace = Thread.currentThread().getStackTrace().filter( el =>
+ (!el.getMethodName().contains("getStackTrace")))
+
+ // Keep crawling up the stack trace until we find the first function not inside of the spark
+ // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
+ // transformation, a SparkContext function (such as parallelize), or anything else that leads
+ // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
+ var lastSparkMethod = "<not_found>"
+ var firstUserMethod = "<not_found>"
+ var firstUserFile = "<not_found>"
+ var firstUserLine = -1
+ var finished = false
+
+ for (el <- trace) {
+ if (!finished) {
+ if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
+ lastSparkMethod = el.getMethodName()
+ }
+ else {
+ firstUserMethod = el.getMethodName()
+ firstUserLine = el.getLineNumber()
+ firstUserFile = el.getFileName()
+ finished = true
+ }
+ }
+ }
+ "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine)
+ }
+
// Transformations (return a new RDD)
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 2e2dc295b6..4944f41e3a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -337,7 +337,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
- logInfo("Submitting " + stage + ", which has no missing parents")
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
@@ -452,6 +453,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ " which is now runnable")
submitMissingTasks(stage)
}
}