aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-11-25 06:50:36 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-25 06:50:36 -0800
commit69cd53eae205eb10d52eaf38466db58a23b6ae81 (patch)
tree2549dc9de8680143280d00bb6ff0310a3c8dcb09 /streaming
parentd240760191f692ee7b88dfc82f06a31a340a88a2 (diff)
downloadspark-69cd53eae205eb10d52eaf38466db58a23b6ae81.tar.gz
spark-69cd53eae205eb10d52eaf38466db58a23b6ae81.tar.bz2
spark-69cd53eae205eb10d52eaf38466db58a23b6ae81.zip
[SPARK-4601][Streaming] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI
When running the NetworkWordCount, the description of the word count jobs are set as "getCallsite at DStream:xxx" . This should be set to the line number of the streaming application that has the output operation that led to the job being created. This is because the callsite is incorrectly set in the thread launching the jobs. This PR fixes that. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #3455 from tdas/streaming-callsite-fix and squashes the following commits: 69fc26f [Tathagata Das] Set correct call site for streaming jobs so that it is displayed correctly on the Spark UI
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala6
2 files changed, 6 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
index 905bc723f6..1361c30395 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
@@ -38,6 +38,7 @@ class ForEachDStream[T: ClassTag] (
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
+ ssc.sparkContext.setCallSite(creationSite)
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 4b49c4d251..9f352bdcb0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -336,16 +336,20 @@ package object testPackage extends Assertions {
// Verify creation site of generated RDDs
var rddGenerated = false
- var rddCreationSiteCorrect = true
+ var rddCreationSiteCorrect = false
+ var foreachCallSiteCorrect = false
inputStream.foreachRDD { rdd =>
rddCreationSiteCorrect = rdd.creationSite == creationSite
+ foreachCallSiteCorrect =
+ rdd.sparkContext.getCallSite().shortForm.contains("StreamingContextSuite")
rddGenerated = true
}
ssc.start()
eventually(timeout(10000 millis), interval(10 millis)) {
assert(rddGenerated && rddCreationSiteCorrect, "RDD creation site was not correct")
+ assert(rddGenerated && foreachCallSiteCorrect, "Call site in foreachRDD was not correct")
}
} finally {
ssc.stop()