aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-09-28 15:17:25 -0700
committerPatrick Wendell <pwendell@gmail.com>2012-09-28 15:51:46 -0700
commitc387e40fb1d9ecfccba1ea5869bffe0b2934c80b (patch)
treebe1142a974fa1681e5bf50a6788f11f189edf659 /core/src/main
parent0850d641afa1f7181c7dc611a08e2b9530540adc (diff)
downloadspark-c387e40fb1d9ecfccba1ea5869bffe0b2934c80b.tar.gz
spark-c387e40fb1d9ecfccba1ea5869bffe0b2934c80b.tar.bz2
spark-c387e40fb1d9ecfccba1ea5869bffe0b2934c80b.zip
Log message which records RDD origin
This adds tracking to determine the "origin" of an RDD. Origin is defined by the boundary between the user's code and the spark code, during an RDD's instantiation. It is meant to help users understand where a Spark RDD is coming from in their code. This patch also logs origin data when stages are submitted to the scheduler. Finally, it adds a new log message to fix an inconsitency in the way that dependent stages (those missing parents) and independent stages (those without) are logged during submission.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala35
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala5
2 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index cce0ea2183..8e50ea5853 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -12,6 +12,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions.mapAsScalaMap
+import scala.util.control.Breaks._
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@@ -61,6 +62,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def compute(split: Split): Iterator[T]
@transient val dependencies: List[Dependency[_]]
+ // Record user function generating this RDD
+ val origin = getOriginDescription
+
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
@@ -124,6 +128,37 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
}
+ // Describe which spark and user functions generated this RDD. Only works if called from
+ // constructor.
+ def getOriginDescription : String = {
+ val trace = Thread.currentThread().getStackTrace().filter( el =>
+ (!el.getMethodName().contains("getStackTrace")))
+
+ // Keep crawling up the stack trace until we find the first function not inside of the spark
+ // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
+ // transformation, a SparkContext function (such as parallelize), or anything else that leads
+ // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
+ var lastSparkMethod = "<not_found>"
+ var firstUserMethod = "<not_found>"
+ var firstUserFile = "<not_found>"
+ var firstUserLine = -1
+
+ breakable {
+ for (el <- trace) {
+ if (el.getClassName().contains("spark") && !el.getClassName().contains("spark.examples")) {
+ lastSparkMethod = el.getMethodName()
+ }
+ else {
+ firstUserMethod = el.getMethodName()
+ firstUserLine = el.getLineNumber()
+ firstUserFile = el.getFileName()
+ break
+ }
+ }
+ }
+ "%s called in %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine)
+ }
+
// Transformations (return a new RDD)
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 2e2dc295b6..4944f41e3a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -337,7 +337,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
- logInfo("Submitting " + stage + ", which has no missing parents")
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
@@ -452,6 +453,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
+ logInfo("Submitting " + stage + " from " + stage.rdd.origin +
+ " which is now runnable")
submitMissingTasks(stage)
}
}