aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala6
2 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 905bc723f6..1361c30395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
+ ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4b49c4d251..9f352bdcb0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -336,16 +336,20 @@ package object testPackage extends Assertions {
// Verify creation site of generated RDDs
var rddGenerated = false
- var rddCreationSiteCorrect = true
+ var rddCreationSiteCorrect = false
+ var foreachCallSiteCorrect = false
inputStream.foreachRDD { rdd =>
rddCreationSiteCorrect = rdd.creationSite == creationSite
+ foreachCallSiteCorrect =
+ rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
rddGenerated = true
}
ssc.start()
eventually(timeout(10000 millis), interval(10 millis)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
+ assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
}
} finally {
ssc.stop()