aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-11-10 16:54:06 -0800
committerAndrew Or <andrew@databricks.com>2015-11-10 16:54:06 -0800
commit6600786dddc89cb16779ee56b9173f63a3af3f27 (patch)
tree8c8d44219d672619d7abbfd17829e5fd3bf14c36 /streaming
parent900917541651abe7125f0d205085d2ab6a00d92c (diff)
downloadspark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.gz
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.tar.bz2
spark-6600786dddc89cb16779ee56b9173f63a3af3f27.zip
[SPARK-11361][STREAMING] Show scopes of RDD operations inside DStream.foreachRDD and DStream.transform in DAG viz
Currently, when a DStream sets the scope for RDD generated by it, that scope is not allowed to be overridden by the RDD operations. So in case of `DStream.foreachRDD`, all the RDDs generated inside the foreachRDD get the same scope - `foreachRDD <time>`, as set by the `ForeachDStream`. So it is hard to debug generated RDDs in the RDD DAG viz in the Spark UI. This patch allows the RDD operations inside `DStream.transform` and `DStream.foreachRDD` to append their own scopes to the earlier DStream scope. I have also slightly tweaked how callsites are set such that the short callsite reflects the RDD operation name and line number. This tweak is necessary as callsites are not managed through scopes (which support nesting and overriding) and I didnt want to add another local property to control nesting and overriding of callsites. ## Before: ![image](https://cloud.githubusercontent.com/assets/663212/10808548/fa71c0c4-7da9-11e5-9af0-5737793a146f.png) ## After: ![image](https://cloud.githubusercontent.com/assets/663212/10808659/37bc45b6-7dab-11e5-8041-c20be6a9bc26.png) The code that was used to generate this is: ``` val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.foreachRDD { rdd => val temp = rdd.map { _ -> 1 }.reduceByKey( _ + _) val temp2 = temp.map { _ -> 1}.reduceByKey(_ + _) val count = temp2.count println(count) } ``` Note - The inner scopes of the RDD operations map/reduceByKey inside foreachRDD is visible - The short callsites of stages refers to the line number of the RDD ops rather than the same line number of foreachRDD in all three cases. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9315 from tdas/SPARK-11361.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala63
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala75
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
5 files changed, 141 insertions, 28 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1da0b0a54d..1a6edf9473 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -341,7 +341,7 @@ abstract class DStream[T: ClassTag] (
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
- val rddOption = createRDDWithLocalProperties(time) {
+ val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
@@ -373,27 +373,52 @@ abstract class DStream[T: ClassTag] (
/**
* Wrap a body of code such that the call site and operation scope
* information are passed to the RDDs created in this body properly.
- */
- protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+ * @param body RDD creation code to execute with certain local properties.
+ * @param time Current batch time that should be embedded in the scope names
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the inner RDDs generated
+ * by `body` will be displayed in the UI; only the scope and callsite
+ * of the DStream operation that generated `this` will be displayed.
+ */
+ protected[streaming] def createRDDWithLocalProperties[U](
+ time: Time,
+ displayInnerRDDOps: Boolean)(body: => U): U = {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
// Pass this DStream's operation scope and creation site information to RDDs through
// thread-local properties in our SparkContext. Since this method may be called from another
// DStream, we need to temporarily store any old scope and creation site information to
// restore them later after setting our own.
- val prevCallSite = ssc.sparkContext.getCallSite()
+ val prevCallSite = CallSite(
+ ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
+ ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
+ )
val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
try {
- ssc.sparkContext.setCallSite(creationSite)
+ if (displayInnerRDDOps) {
+ // Unset the short form call site, so that generated RDDs get their own
+ ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
+ ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
+ } else {
+ // Set the callsite, so that the generated RDDs get the DStream's call site and
+ // the internal RDD call sites do not get displayed
+ ssc.sparkContext.setCallSite(creationSite)
+ }
+
// Use the DStream's base scope for this RDD so we can (1) preserve the higher level
// DStream operation name, and (2) share this scope with other DStreams created in the
// same operation. Disallow nesting so that low-level Spark primitives do not show up.
// TODO: merge callsites with scopes so we can just reuse the code there
makeScope(time).foreach { s =>
ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
- ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ if (displayInnerRDDOps) {
+ // Allow inner RDDs to add inner scopes
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
+ } else {
+ // Do not allow inner RDDs to override the scope set by DStream
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ }
}
body
@@ -628,7 +653,7 @@ abstract class DStream[T: ClassTag] (
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
- this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
+ foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/**
@@ -639,7 +664,23 @@ abstract class DStream[T: ClassTag] (
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
+ foreachRDD(foreachFunc, displayInnerRDDOps = true)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ * @param foreachFunc foreachRDD function
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ * in the `foreachFunc` to be displayed in the UI. If `false`, then
+ * only the scopes and callsites of `foreachRDD` will override those
+ * of the RDDs on the display.
+ */
+ private def foreachRDD(
+ foreachFunc: (RDD[T], Time) => Unit,
+ displayInnerRDDOps: Boolean): Unit = {
+ new ForEachDStream(this,
+ context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
/**
@@ -730,7 +771,7 @@ abstract class DStream[T: ClassTag] (
// scalastyle:on println
}
}
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
+ foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
/**
@@ -900,7 +941,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
/**
@@ -913,7 +954,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index c109ceccc6..4410a9977c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -22,10 +22,19 @@ import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.scheduler.Job
import scala.reflect.ClassTag
+/**
+ * An internal DStream used to represent output operations like DStream.foreachRDD.
+ * @param parent Parent DStream
+ * @param foreachFunc Function to apply on each RDD generated by the parent DStream
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ * by `foreachFunc` will be displayed in the UI; only the scope and
+ * callsite of `DStream.foreachRDD` will be displayed.
+ */
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
+ foreachFunc: (RDD[T], Time) => Unit,
+ displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
@@ -37,8 +46,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
- val jobFunc = () => createRDDWithLocalProperties(time) {
- ssc.sparkContext.setCallSite(creationSite)
+ val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5eabdf63dc..080bc873fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -51,4 +51,17 @@ class TransformedDStream[U: ClassTag] (
}
Some(transformedRDD)
}
+
+ /**
+ * Wrap a body of code such that the call site and operation scope
+ * information are passed to the RDDs created in this body properly.
+ * This has been overriden to make sure that `displayInnerRDDOps` is always `true`, that is,
+ * the inner scopes and callsites of RDDs generated in `DStream.transform` are always
+ * displayed in the UI.
+ */
+ override protected[streaming] def createRDDWithLocalProperties[U](
+ time: Time,
+ displayInnerRDDOps: Boolean)(body: => U): U = {
+ super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d74b..bc223e648a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark.streaming
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
/**
* Tests whether scope information is passed from DStream operations to RDDs correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
- ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+ val conf = new SparkConf().setMaster("local").setAppName("test")
+ conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+ ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
test("scoping nested operations") {
val inputStream = new DummyInputDStream(ssc)
+ // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only countByKeyAndWindow
+ // should appear in scope
val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
countStream.initialize(Time(0))
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
testStream(countStream)
}
+ test("transform should allow RDD operations to be captured in scopes") {
+ val inputStream = new DummyInputDStream(ssc)
+ val transformedStream = inputStream.transform { _.map { _ -> 1}.reduceByKey(_ + _) }
+ transformedStream.initialize(Time(0))
+
+ val transformScopeBase = transformedStream.baseScope.map(RDDOperationScope.fromJson)
+ val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+ val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+ val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+ // Assert that all children RDDs inherit the DStream operation name correctly
+ assertDefined(transformScopeBase, transformScope1, transformScope2, transformScope3)
+ assert(transformScopeBase.get.name === "transform")
+ assertNestedScopeCorrect(transformScope1.get, 1000)
+ assertNestedScopeCorrect(transformScope2.get, 2000)
+ assertNestedScopeCorrect(transformScope3.get, 3000)
+
+ def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: Long): Unit = {
+ assert(rddScope.name === "reduceByKey")
+ assert(rddScope.parent.isDefined)
+ assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, batchTime)
+ }
+ }
+
+ test("foreachRDD should allow RDD operations to be captured in scope") {
+ val inputStream = new DummyInputDStream(ssc)
+ val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+ inputStream.foreachRDD { rdd =>
+ generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+ }
+ val batchCounter = new BatchCounter(ssc)
+ ssc.start()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(3000)
+ batchCounter.waitUntilBatchesCompleted(3, 10000)
+ assert(generatedRDDs.size === 3)
+
+ val foreachBaseScope =
+ ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+ assertDefined(foreachBaseScope)
+ assert(foreachBaseScope.get.name === "foreachRDD")
+
+ val rddScopes = generatedRDDs.map { _.scope }
+ assertDefined(rddScopes: _*)
+ rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+ assert(rddScope.get.name === "reduceByKey")
+ assert(rddScope.get.parent.isDefined)
+ assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 1) * 1000)
+ }
+ }
+
/** Assert that the RDD operation scope properties are not set in our SparkContext. */
private def assertPropertiesNotSet(): Unit = {
assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
baseScope: RDDOperationScope,
rddScope: RDDOperationScope,
batchTime: Long): Unit = {
- assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
- }
-
- /** Assert that the given RDD scope inherits the base name and ID correctly. */
- private def assertScopeCorrect(
- baseScopeId: String,
- baseScopeName: String,
- rddScope: RDDOperationScope,
- batchTime: Long): Unit = {
+ val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
val formattedBatchTime = UIUtils.formatBatchTime(
batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
assert(rddScope.id === s"${baseScopeId}_$batchTime")
assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+ assert(rddScope.parent.isEmpty) // There should not be any higher scope
}
/** Assert that all the specified options are defined. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b544..a45c92d9c7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])