diff options
author | Andrew Or <andrew@databricks.com> | 2015-05-12 01:40:55 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-05-12 01:40:55 -0700 |
commit | f3e8e60063ccf0d713d03e671a3231560475f90d (patch) | |
tree | c1612a1a0e075deef5c02cfd87dd10559b51f744 /core/src | |
parent | 82e890fb19d6fbaffa69856eecb4699f2f8a81eb (diff) | |
download | spark-f3e8e60063ccf0d713d03e671a3231560475f90d.tar.gz spark-f3e8e60063ccf0d713d03e671a3231560475f90d.tar.bz2 spark-f3e8e60063ccf0d713d03e671a3231560475f90d.zip |
[SPARK-7467] Dag visualization: treat checkpoint as an RDD operation
Such that a checkpoint RDD does not go into random scopes on the UI, e.g. `take`. We've seen this in streaming.
Author: Andrew Or <andrew@databricks.com>
Closes #6004 from andrewor14/dag-viz-checkpoint and squashes the following commits:
9217439 [Andrew Or] Fix checkpoints
4ae8806 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-checkpoint
19bc07b [Andrew Or] Treat checkpoint as an RDD operation
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 7dad30ecbd..02a94baf37 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1523,13 +1523,15 @@ abstract class RDD[T: ClassTag]( * has completed (therefore the RDD has been materialized and potentially stored in memory). * doCheckpoint() is called recursively on the parent RDDs. */ - private[spark] def doCheckpoint() { - if (!doCheckpointCalled) { - doCheckpointCalled = true - if (checkpointData.isDefined) { - checkpointData.get.doCheckpoint() - } else { - dependencies.foreach(_.rdd.doCheckpoint()) + private[spark] def doCheckpoint(): Unit = { + RDDOperationScope.withScope(sc, "checkpoint", false, true) { + if (!doCheckpointCalled) { + doCheckpointCalled = true + if (checkpointData.isDefined) { + checkpointData.get.doCheckpoint() + } else { + dependencies.foreach(_.rdd.doCheckpoint()) + } } } } |