From 2a5161708f4d2f743c7bd69ed3d98bb7bff46460 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 10 Mar 2014 16:28:41 -0700 Subject: SPARK-1205: Clean up callSite/origin/generator. This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. Author: Patrick Wendell Closes #106 from pwendell/callsite and squashes the following commits: fc1d009 [Patrick Wendell] Compile fix e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite 62e77ef [Patrick Wendell] Review feedback 576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator. --- .../src/main/scala/org/apache/spark/SparkContext.scala | 11 +++++------ .../main/scala/org/apache/spark/api/java/JavaRDD.scala | 2 -- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 5 ----- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 18 ++++-------------- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../scala/org/apache/spark/scheduler/JobLogger.scala | 10 +++------- .../main/scala/org/apache/spark/scheduler/Stage.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- 8 files changed, 16 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index cdc0e5a342..745e3fa4e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -832,13 +832,12 @@ class SparkContext( setLocalProperty("externalCallSite", null) } + /** + * Capture the current user callsite and return a formatted version for printing. If the user + * has overridden the call site, this will return the user's version. + */ private[spark] def getCallSite(): String = { - val callSite = getLocalProperty("externalCallSite") - if (callSite == null) { - Utils.formatSparkCallSite - } else { - callSite - } + Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo()) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 91bf404631..01d9357a25 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] = wrapRDD(rdd.subtract(other, p)) - def generator: String = rdd.generator - override def toString = rdd.toString /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3..a89419bbd1 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,6 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} -import scala.Tuple2 import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def name(): String = rdd.name - /** Reset generator */ - def setGenerator(_generator: String) = { - rdd.setGenerator(_generator) - } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0..4afa7523dd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag]( this } - /** User-defined generator of this RDD*/ - @transient var generator = Utils.getCallSiteInfo.firstUserClass - - /** Reset generator*/ - def setGenerator(_generator: String) = { - generator = _generator - } - /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not @@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE - /** Record user function generating this RDD. */ - @transient private[spark] val origin = sc.getCallSite() + /** User code that created this RDD (e.g. `textFile`, `parallelize`). */ + @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo + private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo) private[spark] def elementClassTag: ClassTag[T] = classTag[T] @@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag]( } override def toString: String = "%s%s[%d] at %s".format( - Option(name).map(_ + " ").getOrElse(""), - getClass.getSimpleName, - id, - origin) + Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index dc5b25d845..d83d0341c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -279,7 +279,7 @@ class DAGScheduler( } else { // Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown - logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")") + logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size) } stage diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 80f9ec7d03..01cbcc390c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String) * @param indent Indent number before info */ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE" val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } + s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " + + s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}" jobLogInfo(jobID, indentString(indent) + rddInfo, false) rdd.dependencies.foreach { case shufDep: ShuffleDependency[_, _] => diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index a78b0186b9..5c1fc30e4a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -100,7 +100,7 @@ private[spark] class Stage( id } - val name = callSite.getOrElse(rdd.origin) + val name = callSite.getOrElse(rdd.getCreationSite) override def toString = "Stage " + id diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ac376fc403..38a275d438 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -719,8 +719,8 @@ private[spark] object Utils extends Logging { new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass) } - def formatSparkCallSite = { - val callSiteInfo = getCallSiteInfo + /** Returns a printable version of the call site info suitable for logs. */ + def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) } -- cgit v1.2.3