aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
8 files changed, 16 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdc0e5a342..745e3fa4e8 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -832,13 +832,12 @@ class SparkContext(
setLocalProperty("externalCallSite", null)
}
+ /**
+ * Capture the current user callsite and return a formatted version for printing. If the user
+ * has overridden the call site, this will return the user's version.
+ */
private[spark] def getCallSite(): String = {
- val callSite = getLocalProperty("externalCallSite")
- if (callSite == null) {
- Utils.formatSparkCallSite
- } else {
- callSite
- }
+ Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
}
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 91bf404631..01d9357a25 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -135,8 +135,6 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
wrapRDD(rdd.subtract(other, p))
- def generator: String = rdd.generator
-
override def toString = rdd.toString
/** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index af0114bee3..a89419bbd1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,6 @@ package org.apache.spark.api.java
import java.util.{Comparator, List => JList}
-import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -500,8 +499,4 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def name(): String = rdd.name
- /** Reset generator */
- def setGenerator(_generator: String) = {
- rdd.setGenerator(_generator)
- }
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3fe56963e0..4afa7523dd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -126,14 +126,6 @@ abstract class RDD[T: ClassTag](
this
}
- /** User-defined generator of this RDD*/
- @transient var generator = Utils.getCallSiteInfo.firstUserClass
-
- /** Reset generator*/
- def setGenerator(_generator: String) = {
- generator = _generator
- }
-
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
@@ -1031,8 +1023,9 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
- /** Record user function generating this RDD. */
- @transient private[spark] val origin = sc.getCallSite()
+ /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
+ @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
+ private[spark] def getCreationSite = Utils.formatCallSiteInfo(creationSiteInfo)
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
@@ -1095,10 +1088,7 @@ abstract class RDD[T: ClassTag](
}
override def toString: String = "%s%s[%d] at %s".format(
- Option(name).map(_ + " ").getOrElse(""),
- getClass.getSimpleName,
- id,
- origin)
+ Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, getCreationSite)
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dc5b25d845..d83d0341c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -279,7 +279,7 @@ class DAGScheduler(
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
- logInfo("Registering RDD " + rdd.id + " (" + rdd.origin + ")")
+ logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 80f9ec7d03..01cbcc390c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -213,14 +213,10 @@ class JobLogger(val user: String, val logDirName: String)
* @param indent Indent number before info
*/
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+ val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
val rddInfo =
- if (rdd.getStorageLevel != StorageLevel.NONE) {
- "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " +
- rdd.origin + " " + rdd.generator
- } else {
- "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " +
- rdd.origin + " " + rdd.generator
- }
+ s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
+ s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index a78b0186b9..5c1fc30e4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -100,7 +100,7 @@ private[spark] class Stage(
id
}
- val name = callSite.getOrElse(rdd.origin)
+ val name = callSite.getOrElse(rdd.getCreationSite)
override def toString = "Stage " + id
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index ac376fc403..38a275d438 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -719,8 +719,8 @@ private[spark] object Utils extends Logging {
new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
}
- def formatSparkCallSite = {
- val callSiteInfo = getCallSiteInfo
+ /** Returns a printable version of the call site info suitable for logs. */
+ def formatCallSiteInfo(callSiteInfo: CallSiteInfo = Utils.getCallSiteInfo) = {
"%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
callSiteInfo.firstUserLine)
}