From c387e40fb1d9ecfccba1ea5869bffe0b2934c80b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 28 Sep 2012 15:17:25 -0700 Subject: Log message which records RDD origin This adds tracking to determine the "origin" of an RDD. Origin is defined by the boundary between the user's code and the spark code, during an RDD's instantiation. It is meant to help users understand where a Spark RDD is coming from in their code. This patch also logs origin data when stages are submitted to the scheduler. Finally, it adds a new log message to fix an inconsitency in the way that dependent stages (those missing parents) and independent stages (those without) are logged during submission. --- core/src/main/scala/spark/RDD.scala | 35 ++++++++++++++++++++++ .../main/scala/spark/scheduler/DAGScheduler.scala | 5 +++- 2 files changed, 39 insertions(+), 1 deletion(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index cce0ea2183..8e50ea5853 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,6 +12,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions.mapAsScalaMap +import scala.util.control.Breaks._ import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -61,6 +62,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def compute(split: Split): Iterator[T] @transient val dependencies: List[Dependency[_]] + // Record user function generating this RDD + val origin = getOriginDescription + // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None @@ -124,6 +128,37 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } } + // Describe which spark and user functions generated this RDD. Only works if called from + // constructor. + def getOriginDescription : String = { + val trace = Thread.currentThread().getStackTrace().filter( el => + (!el.getMethodName().contains("getStackTrace"))) + + // Keep crawling up the stack trace until we find the first function not inside of the spark + // package. We track the last (shallowest) contiguous Spark method. This might be an RDD + // transformation, a SparkContext function (such as parallelize), or anything else that leads + // to instantiation of an RDD. We also track the first (deepest) user method, file, and line. + var lastSparkMethod = "" + var firstUserMethod = "" + var firstUserFile = "" + var firstUserLine = -1 + + breakable { + for (el <- trace) { + if (el.getClassName().contains("spark") && !el.getClassName().contains("spark.examples")) { + lastSparkMethod = el.getMethodName() + } + else { + firstUserMethod = el.getMethodName() + firstUserLine = el.getLineNumber() + firstUserFile = el.getFileName() + break + } + } + } + "%s called in %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) + } + // Transformations (return a new RDD) def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 2e2dc295b6..4944f41e3a 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -337,7 +337,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { - logInfo("Submitting " + stage + ", which has no missing parents") + logInfo("Submitting " + stage + " from " + stage.rdd.origin + + ", which has no missing parents") submitMissingTasks(stage) running += stage } else { @@ -452,6 +453,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with waiting --= newlyRunnable running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { + logInfo("Submitting " + stage + " from " + stage.rdd.origin + + " which is now runnable") submitMissingTasks(stage) } } -- cgit v1.2.3 From bc909c2903f80452b62091c9a32fd7587a41b218 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 28 Sep 2012 16:04:36 -0700 Subject: Changes based on Matei's comments --- core/src/main/scala/spark/RDD.scala | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8e50ea5853..8590d487cc 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,7 +12,6 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions.mapAsScalaMap -import scala.util.control.Breaks._ import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -142,21 +141,22 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial var firstUserMethod = "" var firstUserFile = "" var firstUserLine = -1 - - breakable { - for (el <- trace) { - if (el.getClassName().contains("spark") && !el.getClassName().contains("spark.examples")) { - lastSparkMethod = el.getMethodName() - } - else { - firstUserMethod = el.getMethodName() - firstUserLine = el.getLineNumber() - firstUserFile = el.getFileName() - break - } + var finished = false + + for (el <- trace) { + if (!finished) { + if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { + lastSparkMethod = el.getMethodName() + } + else { + firstUserMethod = el.getMethodName() + firstUserLine = el.getLineNumber() + firstUserFile = el.getFileName() + finished = true + } } } - "%s called in %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) + "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) } // Transformations (return a new RDD) -- cgit v1.2.3 From 9fc78f8f2907828c448fd53155ec01203212321d Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 28 Sep 2012 16:05:50 -0700 Subject: Fixing some whitespace issues --- core/src/main/scala/spark/RDD.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 8590d487cc..cce1acc2ca 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -145,15 +145,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial for (el <- trace) { if (!finished) { - if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { - lastSparkMethod = el.getMethodName() - } - else { - firstUserMethod = el.getMethodName() - firstUserLine = el.getLineNumber() - firstUserFile = el.getFileName() - finished = true - } + if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) { + lastSparkMethod = el.getMethodName() + } + else { + firstUserMethod = el.getMethodName() + firstUserLine = el.getLineNumber() + firstUserFile = el.getFileName() + finished = true + } } } "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) -- cgit v1.2.3