aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-03-10 16:28:41 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-03-10 16:28:41 -0700
commit2a5161708f4d2f743c7bd69ed3d98bb7bff46460 (patch)
treea3c2a2531354e421480e8264b56e5e562e9d3dca
parenta59419c27e45f06be5143c58d48affb0a5158bdf (diff)
downloadspark-2a5161708f4d2f743c7bd69ed3d98bb7bff46460.tar.gz
spark-2a5161708f4d2f743c7bd69ed3d98bb7bff46460.tar.bz2
spark-2a5161708f4d2f743c7bd69ed3d98bb7bff46460.zip
SPARK-1205: Clean up callSite/origin/generator.
This patch removes the `generator` field and simplifies + documents the tracking of callsites. There are two places where we care about call sites, when a job is run and when an RDD is created. This patch retains both of those features but does a slight refactoring and renaming to make things less confusing. There was another feature of an rdd called the `generator` which was by default the user class that in which the RDD was created. This is used exclusively in the JobLogger. It been subsumed by the ability to name a job group. The job logger can later be refectored to read the job group directly (will require some work) but for now this just preserves the default logged value of the user class. I'm not sure any users ever used the ability to override this. Author: Patrick Wendell <pwendell@gmail.com> Closes #106 from pwendell/callsite and squashes the following commits: fc1d009 [Patrick Wendell] Compile fix e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite 62e77ef [Patrick Wendell] Review feedback 576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
8 files changed, 16 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdc0e5a342..745e3fa4e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -832,13 +832,12 @@ class SparkContext(
setLocalProperty("externalCallSite", null)
}
+ /**
+ * Capture the current user callsite and return a formatted version for printing. If the user
+ * has overridden the call site, this will return the user's version.
+ */
private[spark] def getCallSite(): String = {
- val callSite = getLocalProperty("externalCallSite")
- if (callSite == null) {
- Utils.formatSparkCallSite
- } else {
- callSite
- }
+ Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 91bf404631..01d9357a25 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
- def generator: String = rdd.generator
-
override def toString = rdd.toString
/** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index af0114bee3..a89419bbd1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.java
import java.util.{Comparator, List => JList}
-import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
- /** Reset generator */
- def setGenerator(_generator: String) = {
- rdd.setGenerator(_generator)
- }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3fe56963e0..4afa7523dd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
this
}
- /** User-defined generator of this RDD*/
- @transient var generator = Utils.getCallSiteInfo.firstUserClass
-
- /** Reset generator*/
- def setGenerator(_generator: String) = {
- generator = _generator
- }
-
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
- /** Record user function generating this RDD. */
- @transient private[spark] val origin = sc.getCallSite()
+ /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
+ @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
+ private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
@@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
}
override def toString: String = "%s%s[%d] at %s".format(
- Option(name).map(_ + " ").getOrElse(""),
- getClass.getSimpleName,
- id,
- origin)
+ Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dc5b25d845..d83d0341c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -279,7 +279,7 @@ class DAGScheduler(
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
- logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+ logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 80f9ec7d03..01cbcc390c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+ val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
val rddInfo =
- if (rdd.getStorageLevel != StorageLevel.NONE) {
- "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
- rdd.origin + " " + rdd.generator
- } else {
- "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
- rdd.origin + " " + rdd.generator
- }
+ s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
+ s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a78b0186b9..5c1fc30e4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -100,7 +100,7 @@ private[spark] class Stage(
id
}
- val name = callSite.getOrElse(rdd.origin)
+ val name = callSite.getOrElse(rdd.getCreationSite)
override def toString = "Stage " + id
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ac376fc403..38a275d438 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}
- def formatSparkCallSite = {
- val callSiteInfo = getCallSiteInfo
+ /** Returns a printable version of the call site info suitable for logs. */
+ def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
callSiteInfo.firstUserLine)
}