aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 16:28:07 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 16:28:07 -0700
commit9f6efbf06a65953c4fcabd439124d71d50c5df6e (patch)
treec6a2a44878e15c2fed9b869ffad45a4987400d40 /core/src/main
parent0121a26bd150e5f76d950e08cf4d536fad635a40 (diff)
parent9fc78f8f2907828c448fd53155ec01203212321d (diff)
downloadspark-9f6efbf06a65953c4fcabd439124d71d50c5df6e.tar.gz
spark-9f6efbf06a65953c4fcabd439124d71d50c5df6e.tar.bz2
spark-9f6efbf06a65953c4fcabd439124d71d50c5df6e.zip
Merge pull request #225 from pwendell/dev
Log message which records RDD origin
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala35
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala5
2 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b0d86ebbae..6883fb70f9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -61,6 +61,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def compute(split: Split): Iterator[T]
@transient val dependencies: List[Dependency[_]]
+ // Record user function generating this RDD
+ val origin = getOriginDescription
+
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
@@ -124,6 +127,38 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
}
+ // Describe which spark and user functions generated this RDD. Only works if called from
+ // constructor.
+ def getOriginDescription : String = {
+ val trace = Thread.currentThread().getStackTrace().filter( el =>
+ (!el.getMethodName().contains("getStackTrace")))
+
+ // Keep crawling up the stack trace until we find the first function not inside of the spark
+ // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
+ // transformation, a SparkContext function (such as parallelize), or anything else that leads
+ // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
+ var lastSparkMethod = "<not_found>"
+ var firstUserMethod = "<not_found>"
+ var firstUserFile = "<not_found>"
+ var firstUserLine = -1
+ var finished = false
+
+ for (el <- trace) {
+ if (!finished) {
+ if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
+ lastSparkMethod = el.getMethodName()
+ }
+ else {
+ firstUserMethod = el.getMethodName()
+ firstUserLine = el.getLineNumber()
+ firstUserFile = el.getFileName()
+ finished = true
+ }
+ }
+ }
+ "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine)
+ }
+
// Transformations (return a new RDD)
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 2e2dc295b6..4944f41e3a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -337,7 +337,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
- logInfo("Submitting " + stage + ", which has no missing parents")
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
@@ -452,6 +453,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ " which is now runnable")
submitMissingTasks(stage)
}
}