aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
1 files changed, 9 insertions, 1 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a80154e2fc..806e181f61 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -182,7 +182,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}
- test("start should set job group and description of streaming jobs correctly") {
+ test("start should set local properties of streaming jobs correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc
@@ -190,16 +190,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
@volatile var jobGroupFound: String = ""
@volatile var jobDescFound: String = ""
@volatile var jobInterruptFound: String = ""
+ @volatile var customPropFound: String = ""
@volatile var allFound: Boolean = false
addInputStream(ssc).foreachRDD { rdd =>
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+ customPropFound = sc.getLocalProperty("customPropKey")
allFound = true
}
+ ssc.sc.setLocalProperty("customPropKey", "value1")
ssc.start()
+ // Local props set after start should be ignored
+ ssc.sc.setLocalProperty("customPropKey", "value2")
+
eventually(timeout(10 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}
@@ -208,11 +214,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(jobGroupFound === null)
assert(jobDescFound.contains("Streaming job from"))
assert(jobInterruptFound === "false")
+ assert(customPropFound === "value1")
// Verify current thread's thread-local properties have not changed
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
+ assert(sc.getLocalProperty("customPropKey") === "value2")
}
test("start multiple times") {