aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-09-21 16:47:52 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-09-21 16:47:52 -0700
commit72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09 (patch)
tree47371f29556a8342ff3ca052f2b6c812cbf90498 /streaming
parent7c4f852bfc39537840f56cd8121457a0dc1ad7c1 (diff)
downloadspark-72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09.tar.gz
spark-72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09.tar.bz2
spark-72869883f12b6e0a4e5aad79c0ac2cfdb4d83f09.zip
[SPARK-10649] [STREAMING] Prevent inheriting job group and irrelevant job description in streaming jobs
The job group, and job descriptions information is passed through thread local properties, and get inherited by child threads. In case of spark streaming, the streaming jobs inherit these properties from the thread that called streamingContext.start(). This may not make sense. 1. Job group: This is mainly used for cancelling a group of jobs together. It does not make sense to cancel streaming jobs like this, as the effect will be unpredictable. And its not a valid usecase any way, to cancel a streaming context, call streamingContext.stop() 2. Job description: This is used to pass on nice text descriptions for jobs to show up in the UI. The job description of the thread that calls streamingContext.start() is not useful for all the streaming jobs, as it does not make sense for all of the streaming jobs to have the same description, and the description may or may not be related to streaming. The solution in this PR is meant for the Spark master branch, where local properties are inherited by cloning the properties. The job group and job description in the thread that starts the streaming scheduler are explicitly removed, so that all the subsequent child threads does not inherit them. Also, the starting is done in a new child thread, so that setting the job group and description for streaming, does not change those properties in the thread that called streamingContext.start(). Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #8781 from tdas/SPARK-10649.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala15
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala32
2 files changed, 44 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index b496d1f341..6720ba4f72 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
-import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils}
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -588,12 +588,20 @@ class StreamingContext private[streaming] (
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
- sparkContext.setCallSite(startSite.get)
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
- scheduler.start()
+
+ // Start the streaming scheduler in a new thread, so that thread local properties
+ // like call sites and job groups can be reset without affecting those of the
+ // current thread.
+ ThreadUtils.runInNewThread("streaming-start") {
+ sparkContext.setCallSite(startSite.get)
+ sparkContext.clearJobGroup()
+ sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
+ scheduler.start()
+ }
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
@@ -618,6 +626,7 @@ class StreamingContext private[streaming] (
}
}
+
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index d26894e88f..3b9d0d15ea 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -180,6 +180,38 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}
+ test("start should set job group and description of streaming jobs correctly") {
+ ssc = new StreamingContext(conf, batchDuration)
+ ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
+ val sc = ssc.sc
+
+ @volatile var jobGroupFound: String = ""
+ @volatile var jobDescFound: String = ""
+ @volatile var jobInterruptFound: String = ""
+ @volatile var allFound: Boolean = false
+
+ addInputStream(ssc).foreachRDD { rdd =>
+ jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
+ jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+ jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+ allFound = true
+ }
+ ssc.start()
+
+ eventually(timeout(10 seconds), interval(10 milliseconds)) {
+ assert(allFound === true)
+ }
+
+ // Verify streaming jobs have expected thread-local properties
+ assert(jobGroupFound === null)
+ assert(jobDescFound === null)
+ assert(jobInterruptFound === "false")
+
+ // Verify current thread's thread-local properties have not changed
+ assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
+ assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
+ assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
+ }
test("start multiple times") {
ssc = new StreamingContext(master, appName, batchDuration)