aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala10
4 files changed, 24 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index cc187f5cb4..928739a416 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{InputStream, NotSerializableException}
+import java.util.Properties
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
@@ -25,6 +26,7 @@ import scala.collection.mutable.Queue
import scala.reflect.ClassTag
import scala.util.control.NonFatal
+import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -198,6 +200,10 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null)
+ // Copy of thread-local properties from SparkContext. These properties will be set in all tasks
+ // submitted by this StreamingContext after start.
+ private[streaming] val savedProperties = new AtomicReference[Properties](new Properties)
+
private[streaming] def getStartSite(): CallSite = startSite.get()
private var shutdownHookRef: AnyRef = _
@@ -573,6 +579,8 @@ class StreamingContext private[streaming] (
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
+ savedProperties.set(SerializationUtils.clone(
+ sparkContext.localProperties.get()).asInstanceOf[Properties])
scheduler.start()
}
state = StreamingContextState.ACTIVE
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 86f069b0bd..307ff1f7ec 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -241,11 +241,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
- // Set the SparkEnv in this thread, so that job generation code can access the environment
- // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
- // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
- SparkEnv.set(ssc.env)
-
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 303c325274..ac18f73ea8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,11 +17,14 @@
package org.apache.spark.streaming.scheduler
+import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.Failure
+import org.apache.commons.lang.SerializationUtils
+
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
import org.apache.spark.streaming._
@@ -214,7 +217,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
import JobScheduler._
def run() {
+ val oldProps = ssc.sparkContext.getLocalProperties
try {
+ ssc.sparkContext.setLocalProperties(
+ SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties])
val formattedTime = UIUtils.formatBatchTime(
job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
@@ -248,8 +254,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// JobScheduler has been stopped.
}
} finally {
- ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
- ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
+ ssc.sparkContext.setLocalProperties(oldProps)
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a80154e2fc..806e181f61 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -182,7 +182,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(ssc.scheduler.isStarted === false)
}
- test("start should set job group and description of streaming jobs correctly") {
+ test("start should set local properties of streaming jobs correctly") {
ssc = new StreamingContext(conf, batchDuration)
ssc.sc.setJobGroup("non-streaming", "non-streaming", true)
val sc = ssc.sc
@@ -190,16 +190,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
@volatile var jobGroupFound: String = ""
@volatile var jobDescFound: String = ""
@volatile var jobInterruptFound: String = ""
+ @volatile var customPropFound: String = ""
@volatile var allFound: Boolean = false
addInputStream(ssc).foreachRDD { rdd =>
jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+ customPropFound = sc.getLocalProperty("customPropKey")
allFound = true
}
+ ssc.sc.setLocalProperty("customPropKey", "value1")
ssc.start()
+ // Local props set after start should be ignored
+ ssc.sc.setLocalProperty("customPropKey", "value2")
+
eventually(timeout(10 seconds), interval(10 milliseconds)) {
assert(allFound === true)
}
@@ -208,11 +214,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
assert(jobGroupFound === null)
assert(jobDescFound.contains("Streaming job from"))
assert(jobInterruptFound === "false")
+ assert(customPropFound === "value1")
// Verify current thread's thread-local properties have not changed
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming")
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true")
+ assert(sc.getLocalProperty("customPropKey") === "value2")
}
test("start multiple times") {