aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-10 16:54:06 -0800
committerAndrew Or <andrew@databricks.com>2015-11-10 16:54:06 -0800
commit6600786dddc89cb16779ee56b9173f63a3af3f27 (patch)
tree8c8d44219d672619d7abbfd17829e5fd3bf14c36 /external
parent900917541651abe7125f0d205085d2ab6a00d92c (diff)
downloadspark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.gz
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.bz2
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.zip
[SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD <time>`, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9315 from tdas/SPARK-11361.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 1a900007b6..79077e4a49 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -37,7 +37,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])