aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorGregory Owen <greowen@gmail.com>2014-07-21 18:55:01 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-07-21 18:55:01 -0700
commitc3462c65684885299cf037d56c88bd53c08c6348 (patch)
treef975d0237f8a6d4edb88e47c3dfa25ccc46ac133 /core/src
parent511a7314037219c23e824ea5363bf7f1df55bab3 (diff)
downloadspark-c3462c65684885299cf037d56c88bd53c08c6348.tar.gz
spark-c3462c65684885299cf037d56c88bd53c08c6348.tar.bz2
spark-c3462c65684885299cf037d56c88bd53c08c6348.zip
[SPARK-2086] Improve output of toDebugString to make shuffle boundaries more clear
Changes RDD.toDebugString() to show hierarchy and shuffle transformations more clearly New output: ``` (3) FlatMappedValuesRDD[325] at apply at Transformer.scala:22 | MappedValuesRDD[324] at apply at Transformer.scala:22 | CoGroupedRDD[323] at apply at Transformer.scala:22 +-(5) MappedRDD[320] at apply at Transformer.scala:22 | | MappedRDD[319] at apply at Transformer.scala:22 | | MappedValuesRDD[318] at apply at Transformer.scala:22 | | MapPartitionsRDD[317] at apply at Transformer.scala:22 | | ShuffledRDD[316] at apply at Transformer.scala:22 | +-(10) MappedRDD[315] at apply at Transformer.scala:22 | | ParallelCollectionRDD[314] at apply at Transformer.scala:22 +-(100) MappedRDD[322] at apply at Transformer.scala:22 | ParallelCollectionRDD[321] at apply at Transformer.scala:22 ``` Author: Gregory Owen <greowen@gmail.com> Closes #1364 from GregOwen/to-debug-string and squashes the following commits: 08f5c78 [Gregory Owen] toDebugString: prettier debug printing to show shuffles and joins more clearly 1603f7b [Gregory Owen] toDebugString: prettier debug printing to show shuffles and joins more clearly
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala52
1 files changed, 48 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 88a918aebf..a1f2827248 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag](
/** A description of this RDD and its recursive dependencies for debugging. */
def toDebugString: String = {
- def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
- Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
- rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
+ // Apply a different rule to the last child
+ def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
+ val len = rdd.dependencies.length
+ len match {
+ case 0 => Seq.empty
+ case 1 =>
+ val d = rdd.dependencies.head
+ debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
+ case _ =>
+ val frontDeps = rdd.dependencies.take(len - 1)
+ val frontDepStrings = frontDeps.flatMap(
+ d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
+
+ val lastDep = rdd.dependencies.last
+ val lastDepStrings =
+ debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
+
+ (frontDepStrings ++ lastDepStrings)
+ }
+ }
+ // The first RDD in the dependency stack has no parents, so no need for a +-
+ def firstDebugString(rdd: RDD[_]): Seq[String] = {
+ val partitionStr = "(" + rdd.partitions.size + ")"
+ val leftOffset = (partitionStr.length - 1) / 2
+ val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
+ Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+ }
+ def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
+ val partitionStr = "(" + rdd.partitions.size + ")"
+ val leftOffset = (partitionStr.length - 1) / 2
+ val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
+ val nextPrefix = (
+ thisPrefix
+ + (if (isLastChild) " " else "| ")
+ + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
+ Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+ }
+ def debugString(rdd: RDD[_],
+ prefix: String = "",
+ isShuffle: Boolean = true,
+ isLastChild: Boolean = false): Seq[String] = {
+ if (isShuffle) {
+ shuffleDebugString(rdd, prefix, isLastChild)
+ }
+ else {
+ Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
+ }
}
- debugString(this).mkString("\n")
+ firstDebugString(this).mkString("\n")
}
override def toString: String = "%s%s[%d] at %s".format(