aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala63
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala75
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
7 files changed, 147 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7421821e26..67270c38fa 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1787,10 +1787,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* has overridden the call site using `setCallSite()`, this will return the user's version.
*/
private[spark] def getCallSite(): CallSite = {
- Option(getLocalProperty(CallSite.SHORT_FORM)).map { case shortCallSite =>
- val longCallSite = Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse("")
- CallSite(shortCallSite, longCallSite)
- }.getOrElse(Utils.getCallSite())
+ val callSite = Utils.getCallSite()
+ CallSite(
+ Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),
+ Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm)
+ )
}
/**
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 1a900007b6..79077e4a49 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -37,7 +37,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 1da0b0a54d..1a6edf9473 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -341,7 +341,7 @@ abstract class DStream[T: ClassTag] (
// of RDD generation, else generate nothing.
if (isTimeValid(time)) {
- val rddOption = createRDDWithLocalProperties(time) {
+ val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details. We need to have this call here because
@@ -373,27 +373,52 @@ abstract class DStream[T: ClassTag] (
/**
* Wrap a body of code such that the call site and operation scope
* information are passed to the RDDs created in this body properly.
- */
- protected def createRDDWithLocalProperties[U](time: Time)(body: => U): U = {
+ * @param body RDD creation code to execute with certain local properties.
+ * @param time Current batch time that should be embedded in the scope names
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the inner RDDs generated
+ * by `body` will be displayed in the UI; only the scope and callsite
+ * of the DStream operation that generated `this` will be displayed.
+ */
+ protected[streaming] def createRDDWithLocalProperties[U](
+ time: Time,
+ displayInnerRDDOps: Boolean)(body: => U): U = {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val scopeNoOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
// Pass this DStream's operation scope and creation site information to RDDs through
// thread-local properties in our SparkContext. Since this method may be called from another
// DStream, we need to temporarily store any old scope and creation site information to
// restore them later after setting our own.
- val prevCallSite = ssc.sparkContext.getCallSite()
+ val prevCallSite = CallSite(
+ ssc.sparkContext.getLocalProperty(CallSite.SHORT_FORM),
+ ssc.sparkContext.getLocalProperty(CallSite.LONG_FORM)
+ )
val prevScope = ssc.sparkContext.getLocalProperty(scopeKey)
val prevScopeNoOverride = ssc.sparkContext.getLocalProperty(scopeNoOverrideKey)
try {
- ssc.sparkContext.setCallSite(creationSite)
+ if (displayInnerRDDOps) {
+ // Unset the short form call site, so that generated RDDs get their own
+ ssc.sparkContext.setLocalProperty(CallSite.SHORT_FORM, null)
+ ssc.sparkContext.setLocalProperty(CallSite.LONG_FORM, null)
+ } else {
+ // Set the callsite, so that the generated RDDs get the DStream's call site and
+ // the internal RDD call sites do not get displayed
+ ssc.sparkContext.setCallSite(creationSite)
+ }
+
// Use the DStream's base scope for this RDD so we can (1) preserve the higher level
// DStream operation name, and (2) share this scope with other DStreams created in the
// same operation. Disallow nesting so that low-level Spark primitives do not show up.
// TODO: merge callsites with scopes so we can just reuse the code there
makeScope(time).foreach { s =>
ssc.sparkContext.setLocalProperty(scopeKey, s.toJson)
- ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ if (displayInnerRDDOps) {
+ // Allow inner RDDs to add inner scopes
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, null)
+ } else {
+ // Do not allow inner RDDs to override the scope set by DStream
+ ssc.sparkContext.setLocalProperty(scopeNoOverrideKey, "true")
+ }
}
body
@@ -628,7 +653,7 @@ abstract class DStream[T: ClassTag] (
*/
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
val cleanedF = context.sparkContext.clean(foreachFunc, false)
- this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
+ foreachRDD((r: RDD[T], t: Time) => cleanedF(r), displayInnerRDDOps = true)
}
/**
@@ -639,7 +664,23 @@ abstract class DStream[T: ClassTag] (
// because the DStream is reachable from the outer object here, and because
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
+ foreachRDD(foreachFunc, displayInnerRDDOps = true)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * 'this' DStream will be registered as an output stream and therefore materialized.
+ * @param foreachFunc foreachRDD function
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ * in the `foreachFunc` to be displayed in the UI. If `false`, then
+ * only the scopes and callsites of `foreachRDD` will override those
+ * of the RDDs on the display.
+ */
+ private def foreachRDD(
+ foreachFunc: (RDD[T], Time) => Unit,
+ displayInnerRDDOps: Boolean): Unit = {
+ new ForEachDStream(this,
+ context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
/**
@@ -730,7 +771,7 @@ abstract class DStream[T: ClassTag] (
// scalastyle:on println
}
}
- new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
+ foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
/**
@@ -900,7 +941,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsObjectFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
/**
@@ -913,7 +954,7 @@ abstract class DStream[T: ClassTag] (
val file = rddToFileName(prefix, suffix, time)
rdd.saveAsTextFile(file)
}
- this.foreachRDD(saveFunc)
+ this.foreachRDD(saveFunc, displayInnerRDDOps = false)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index c109ceccc6..4410a9977c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -22,10 +22,19 @@ import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.streaming.scheduler.Job
import scala.reflect.ClassTag
+/**
+ * An internal DStream used to represent output operations like DStream.foreachRDD.
+ * @param parent Parent DStream
+ * @param foreachFunc Function to apply on each RDD generated by the parent DStream
+ * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
+ * by `foreachFunc` will be displayed in the UI; only the scope and
+ * callsite of `DStream.foreachRDD` will be displayed.
+ */
private[streaming]
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
- foreachFunc: (RDD[T], Time) => Unit
+ foreachFunc: (RDD[T], Time) => Unit,
+ displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
@@ -37,8 +46,7 @@ class ForEachDStream[T: ClassTag] (
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
- val jobFunc = () => createRDDWithLocalProperties(time) {
- ssc.sparkContext.setCallSite(creationSite)
+ val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
index 5eabdf63dc..080bc873fa 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/TransformedDStream.scala
@@ -51,4 +51,17 @@ class TransformedDStream[U: ClassTag] (
}
Some(transformedRDD)
}
+
+ /**
+ * Wrap a body of code such that the call site and operation scope
+ * information are passed to the RDDs created in this body properly.
+ * This has been overriden to make sure that `displayInnerRDDOps` is always `true`, that is,
+ * the inner scopes and callsites of RDDs generated in `DStream.transform` are always
+ * displayed in the UI.
+ */
+ override protected[streaming] def createRDDWithLocalProperties[U](
+ time: Time,
+ displayInnerRDDOps: Boolean)(body: => U): U = {
+ super.createRDDWithLocalProperties(time, displayInnerRDDOps = true)(body)
+ }
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
index 8844c9d74b..bc223e648a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/DStreamScopeSuite.scala
@@ -17,12 +17,15 @@
package org.apache.spark.streaming
+import scala.collection.mutable.ArrayBuffer
+
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-import org.apache.spark.{SparkContext, SparkFunSuite}
-import org.apache.spark.rdd.RDDOperationScope
+import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.ui.UIUtils
+import org.apache.spark.util.ManualClock
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
/**
* Tests whether scope information is passed from DStream operations to RDDs correctly.
@@ -32,7 +35,9 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
private val batchDuration: Duration = Seconds(1)
override def beforeAll(): Unit = {
- ssc = new StreamingContext(new SparkContext("local", "test"), batchDuration)
+ val conf = new SparkConf().setMaster("local").setAppName("test")
+ conf.set("spark.streaming.clock", classOf[ManualClock].getName())
+ ssc = new StreamingContext(new SparkContext(conf), batchDuration)
}
override def afterAll(): Unit = {
@@ -103,6 +108,8 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
test("scoping nested operations") {
val inputStream = new DummyInputDStream(ssc)
+ // countByKeyAndWindow internally uses reduceByKeyAndWindow, but only countByKeyAndWindow
+ // should appear in scope
val countStream = inputStream.countByWindow(Seconds(10), Seconds(1))
countStream.initialize(Time(0))
@@ -137,6 +144,57 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
testStream(countStream)
}
+ test("transform should allow RDD operations to be captured in scopes") {
+ val inputStream = new DummyInputDStream(ssc)
+ val transformedStream = inputStream.transform { _.map { _ -> 1}.reduceByKey(_ + _) }
+ transformedStream.initialize(Time(0))
+
+ val transformScopeBase = transformedStream.baseScope.map(RDDOperationScope.fromJson)
+ val transformScope1 = transformedStream.getOrCompute(Time(1000)).get.scope
+ val transformScope2 = transformedStream.getOrCompute(Time(2000)).get.scope
+ val transformScope3 = transformedStream.getOrCompute(Time(3000)).get.scope
+
+ // Assert that all children RDDs inherit the DStream operation name correctly
+ assertDefined(transformScopeBase, transformScope1, transformScope2, transformScope3)
+ assert(transformScopeBase.get.name === "transform")
+ assertNestedScopeCorrect(transformScope1.get, 1000)
+ assertNestedScopeCorrect(transformScope2.get, 2000)
+ assertNestedScopeCorrect(transformScope3.get, 3000)
+
+ def assertNestedScopeCorrect(rddScope: RDDOperationScope, batchTime: Long): Unit = {
+ assert(rddScope.name === "reduceByKey")
+ assert(rddScope.parent.isDefined)
+ assertScopeCorrect(transformScopeBase.get, rddScope.parent.get, batchTime)
+ }
+ }
+
+ test("foreachRDD should allow RDD operations to be captured in scope") {
+ val inputStream = new DummyInputDStream(ssc)
+ val generatedRDDs = new ArrayBuffer[RDD[(Int, Int)]]
+ inputStream.foreachRDD { rdd =>
+ generatedRDDs += rdd.map { _ -> 1}.reduceByKey(_ + _)
+ }
+ val batchCounter = new BatchCounter(ssc)
+ ssc.start()
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ clock.advance(3000)
+ batchCounter.waitUntilBatchesCompleted(3, 10000)
+ assert(generatedRDDs.size === 3)
+
+ val foreachBaseScope =
+ ssc.graph.getOutputStreams().head.baseScope.map(RDDOperationScope.fromJson)
+ assertDefined(foreachBaseScope)
+ assert(foreachBaseScope.get.name === "foreachRDD")
+
+ val rddScopes = generatedRDDs.map { _.scope }
+ assertDefined(rddScopes: _*)
+ rddScopes.zipWithIndex.foreach { case (rddScope, idx) =>
+ assert(rddScope.get.name === "reduceByKey")
+ assert(rddScope.get.parent.isDefined)
+ assertScopeCorrect(foreachBaseScope.get, rddScope.get.parent.get, (idx + 1) * 1000)
+ }
+ }
+
/** Assert that the RDD operation scope properties are not set in our SparkContext. */
private def assertPropertiesNotSet(): Unit = {
assert(ssc != null)
@@ -149,19 +207,12 @@ class DStreamScopeSuite extends SparkFunSuite with BeforeAndAfter with BeforeAnd
baseScope: RDDOperationScope,
rddScope: RDDOperationScope,
batchTime: Long): Unit = {
- assertScopeCorrect(baseScope.id, baseScope.name, rddScope, batchTime)
- }
-
- /** Assert that the given RDD scope inherits the base name and ID correctly. */
- private def assertScopeCorrect(
- baseScopeId: String,
- baseScopeName: String,
- rddScope: RDDOperationScope,
- batchTime: Long): Unit = {
+ val (baseScopeId, baseScopeName) = (baseScope.id, baseScope.name)
val formattedBatchTime = UIUtils.formatBatchTime(
batchTime, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
assert(rddScope.id === s"${baseScopeId}_$batchTime")
assert(rddScope.name.replaceAll("\\n", " ") === s"$baseScopeName @ $formattedBatchTime")
+ assert(rddScope.parent.isEmpty) // There should not be any higher scope
}
/** Assert that all the specified options are defined. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 0d58a7b544..a45c92d9c7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -98,7 +98,7 @@ class TestOutputStream[T: ClassTag](
) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
@@ -122,7 +122,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
output += collected
- }) {
+ }, false) {
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])