aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-10 16:54:06 -0800
committerAndrew Or <andrew@databricks.com>2015-11-10 16:54:06 -0800
commit6600786dddc89cb16779ee56b9173f63a3af3f27 (patch)
tree8c8d44219d672619d7abbfd17829e5fd3bf14c36 /streaming/src/test
parent900917541651abe7125f0d205085d2ab6a00d92c (diff)
downloadspark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.gz
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.bz2
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.zip
[SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD <time>`, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9315 from tdas/SPARK-11361.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala75
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
2 files changed, 65 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d74b..bc223e648a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark.streaming
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
/**
* Tests whether scope information is passed from DStream operations to RDDs correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
- ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+ val conf = new SparkConf().setMaster("local").setAppName("test")
+ conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+ ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
test("scoping nested operations") {
val inputStream = new DummyInputDStream(ssc)
+ // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only countByKeyAndWindow
+ // should appear in scope
val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
countStream.initialize(Time(0))
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
testStream(countStream)
}
+ test("transform should allow RDD operations to be captured in scopes") {
+ val inputStream = new DummyInputDStream(ssc)
+ val transformedStream = inputStream.transform { _.map { _ -> 1}.reduceByKey(_ + _) }
+ transformedStream.initialize(Time(0))
+
+ val transformScopeBase = transformedStream.baseScope.map(RDDOperationScope.fromJson)
+ val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+ val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+ val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+ // Assert that all children RDDs inherit the DStream operation name correctly
+ assertDefined(transformScopeBase, transformScope1, transformScope2, transformScope3)
+ assert(transformScopeBase.get.name === "transform")
+ assertNestedScopeCorrect(transformScope1.get, 1000)
+ assertNestedScopeCorrect(transformScope2.get, 2000)
+ assertNestedScopeCorrect(transformScope3.get, 3000)
+
+ def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: Long): Unit = {
+ assert(rddScope.name === "reduceByKey")
+ assert(rddScope.parent.isDefined)
+ assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, batchTime)
+ }
+ }
+
+ test("foreachRDD should allow RDD operations to be captured in scope") {
+ val inputStream = new DummyInputDStream(ssc)
+ val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+ inputStream.foreachRDD { rdd =>
+ generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+ }
+ val batchCounter = new BatchCounter(ssc)
+ ssc.start()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(3000)
+ batchCounter.waitUntilBatchesCompleted(3, 10000)
+ assert(generatedRDDs.size === 3)
+
+ val foreachBaseScope =
+ ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+ assertDefined(foreachBaseScope)
+ assert(foreachBaseScope.get.name === "foreachRDD")
+
+ val rddScopes = generatedRDDs.map { _.scope }
+ assertDefined(rddScopes: _*)
+ rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+ assert(rddScope.get.name === "reduceByKey")
+ assert(rddScope.get.parent.isDefined)
+ assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 1) * 1000)
+ }
+ }
+
/** Assert that the RDD operation scope properties are not set in our SparkContext. */
private def assertPropertiesNotSet(): Unit = {
assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
baseScope: RDDOperationScope,
rddScope: RDDOperationScope,
batchTime: Long): Unit = {
- assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
- }
-
- /** Assert that the given RDD scope inherits the base name and ID correctly. */
- private def assertScopeCorrect(
- baseScopeId: String,
- baseScopeName: String,
- rddScope: RDDOperationScope,
- batchTime: Long): Unit = {
+ val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
val formattedBatchTime = UIUtils.formatBatchTime(
batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
assert(rddScope.id === s"${baseScopeId}_$batchTime")
assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+ assert(rddScope.parent.isEmpty) // There should not be any higher scope
}
/** Assert that all the specified options are defined. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b544..a45c92d9c7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])