aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 17:42:00 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-09-28 17:42:00 -0700
commit3d7267999dac59c901afcfd52ce3f40d007015ae (patch)
treebd9a92272dcd41dd18130d6a86c6cb5762730cf1 /core/src/main
parent9f6efbf06a65953c4fcabd439124d71d50c5df6e (diff)
downloadspark-3d7267999dac59c901afcfd52ce3f40d007015ae.tar.gz
spark-3d7267999dac59c901afcfd52ce3f40d007015ae.tar.bz2
spark-3d7267999dac59c901afcfd52ce3f40d007015ae.zip
Print and track user call sites in more places in Spark
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala34
-rw-r--r--core/src/main/scala/spark/Utils.scala42
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala37
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala1
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala2
6 files changed, 63 insertions, 54 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 6883fb70f9..b5f67d1253 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -62,7 +62,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
@transient val dependencies: List[Dependency[_]]
// Record user function generating this RDD
- val origin = getOriginDescription
+ val origin = Utils.getSparkCallSite
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
@@ -127,38 +127,6 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
}
- // Describe which spark and user functions generated this RDD. Only works if called from
- // constructor.
- def getOriginDescription : String = {
- val trace = Thread.currentThread().getStackTrace().filter( el =>
- (!el.getMethodName().contains("getStackTrace")))
-
- // Keep crawling up the stack trace until we find the first function not inside of the spark
- // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
- // transformation, a SparkContext function (such as parallelize), or anything else that leads
- // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
- var lastSparkMethod = "<not_found>"
- var firstUserMethod = "<not_found>"
- var firstUserFile = "<not_found>"
- var firstUserLine = -1
- var finished = false
-
- for (el <- trace) {
- if (!finished) {
- if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
- lastSparkMethod = el.getMethodName()
- }
- else {
- firstUserMethod = el.getMethodName()
- firstUserLine = el.getLineNumber()
- firstUserFile = el.getFileName()
- finished = true
- }
- }
- }
- "%s at: %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine)
- }
-
// Transformations (return a new RDD)
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index eb7d69e816..802277a251 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -12,7 +12,7 @@ import scala.io.Source
/**
* Various utility methods used by Spark.
*/
-object Utils extends Logging {
+private object Utils extends Logging {
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -115,10 +115,8 @@ object Utils extends Logging {
val out = new FileOutputStream(dest)
copyStream(in, out, true)
}
-
-
-
- /* Download a file from a given URL to the local filesystem */
+
+ /** Download a file from a given URL to the local filesystem */
def downloadFile(url: URL, localPath: String) {
val in = url.openStream()
val out = new FileOutputStream(localPath)
@@ -349,4 +347,38 @@ object Utils extends Logging {
def execute(command: Seq[String]) {
execute(command, new File("."))
}
+
+
+ /**
+ * When called inside a class in the spark package, returns the name of the user code class
+ * (outside the spark package) that called into Spark, as well as which Spark method they called.
+ * This is used, for example, to tell users where in their code each RDD got created.
+ */
+ def getSparkCallSite: String = {
+ val trace = Thread.currentThread().getStackTrace().filter( el =>
+ (!el.getMethodName().contains("getStackTrace")))
+
+ // Keep crawling up the stack trace until we find the first function not inside of the spark
+ // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
+ // transformation, a SparkContext function (such as parallelize), or anything else that leads
+ // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
+ var lastSparkMethod = "<unknown>"
+ var firstUserFile = "<unknown>"
+ var firstUserLine = 0
+ var finished = false
+
+ for (el <- trace) {
+ if (!finished) {
+ if (el.getClassName().contains("spark") && !el.getClassName().startsWith("spark.examples")) {
+ lastSparkMethod = el.getMethodName()
+ }
+ else {
+ firstUserLine = el.getLineNumber()
+ firstUserFile = el.getFileName()
+ finished = true
+ }
+ }
+ }
+ "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index 0ecff9ce77..e09b92d667 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -10,6 +10,7 @@ class ActiveJob(
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
+ val callSite: String,
val listener: JobListener) {
val numPartitions = partitions.length
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 4944f41e3a..0d9922766a 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -121,7 +121,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def newStage(rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_,_]], priority: Int): Stage = {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of splits is unknown
- logInfo("Registering RDD " + rdd.id + ": " + rdd)
+ logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
cacheTracker.registerRDD(rdd.id, rdd.splits.size)
if (shuffleDep != None) {
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.splits.size)
@@ -144,7 +144,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of splits is unknown
- logInfo("Registering parent RDD " + r.id + ": " + r)
+ logInfo("Registering parent RDD " + r.id + " (" + r.origin + ")")
cacheTracker.registerRDD(r.id, r.splits.size)
for (dep <- r.dependencies) {
dep match {
@@ -200,11 +200,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
val waiter = new JobWaiter(partitions.size)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
- eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, waiter))
+ val callSite = Utils.getSparkCallSite
+ eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter))
waiter.getResult() match {
case JobSucceeded(results: Seq[_]) =>
+ logInfo("Finished " + callSite)
return results.asInstanceOf[Seq[U]].toArray
case JobFailed(exception: Exception) =>
+ logInfo("Failed to run " + callSite)
throw exception
}
}
@@ -219,7 +222,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.splits.size).toArray
- eventQueue.put(JobSubmitted(rdd, func2, partitions, false, listener))
+ eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener))
return listener.getResult() // Will throw an exception if the job fails
}
@@ -239,13 +242,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
event match {
- case JobSubmitted(finalRDD, func, partitions, allowLocal, listener) =>
+ case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener) =>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD, None, runId)
- val job = new ActiveJob(runId, finalStage, func, partitions, listener)
+ val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener)
updateCacheLocs()
- logInfo("Got job " + job.runId + " with " + partitions.length + " output partitions")
- logInfo("Final stage: " + finalStage)
+ logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
+ " output partitions")
+ logInfo("Final stage: " + finalStage + " (" + finalStage.origin + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
@@ -337,8 +341,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
- logInfo("Submitting " + stage + " from " + stage.rdd.origin +
- ", which has no missing parents")
+ logInfo("Submitting " + stage + " (" + stage.origin + "), which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
@@ -425,7 +428,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
stage.addOutputLoc(smt.partition, bmAddress)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
- logInfo(stage + " finished; looking for newly runnable stages")
+ logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
running -= stage
logInfo("running: " + running)
logInfo("waiting: " + waiting)
@@ -439,7 +442,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
if (stage.outputLocs.count(_ == Nil) != 0) {
// Some tasks had failed; let's resubmit this stage
// TODO: Lower-level scheduler should also deal with this
- logInfo("Resubmitting " + stage + " because some of its tasks had failed: " +
+ logInfo("Resubmitting " + stage + " (" + stage.origin +
+ ") because some of its tasks had failed: " +
stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
submitStage(stage)
} else {
@@ -453,8 +457,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
waiting --= newlyRunnable
running ++= newlyRunnable
for (stage <- newlyRunnable.sortBy(_.id)) {
- logInfo("Submitting " + stage + " from " + stage.rdd.origin +
- " which is now runnable")
+ logInfo("Submitting " + stage + " (" + stage.origin + "), which is now runnable")
submitMissingTasks(stage)
}
}
@@ -471,12 +474,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage
- logInfo("Marking " + failedStage + " for resubmision due to a fetch failure")
+ logInfo("Marking " + failedStage + " (" + failedStage.origin +
+ ") for resubmision due to a fetch failure")
// Mark the map whose fetch failed as broken in the map stage
val mapStage = shuffleToMapStage(shuffleId)
mapStage.removeOutputLoc(mapId, bmAddress)
mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
- logInfo("The failed fetch was from " + mapStage + "; marking it for resubmission")
+ logInfo("The failed fetch was from " + mapStage + " (" + mapStage.origin +
+ "); marking it for resubmission")
failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 1322aae3a3..11f0ef6245 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -17,6 +17,7 @@ case class JobSubmitted(
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
allowLocal: Boolean,
+ callSite: String,
listener: JobListener)
extends DAGSchedulerEvent
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index cd660c9085..b3ef8ac565 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -80,6 +80,8 @@ class Stage(
return id
}
+ def origin: String = rdd.origin
+
override def toString = "Stage " + id // + ": [RDD = " + rdd.id + ", isShuffle = " + isShuffleMap + "]"
override def hashCode(): Int = id