From 6f27027d96ada29d8bb1d626f2cc7c856df3d597 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 11 Apr 2016 18:33:54 -0700 Subject: [SPARK-14475] Propagate user-defined context from driver to executors ## What changes were proposed in this pull request? This adds a new API call `TaskContext.getLocalProperty` for getting properties set in the driver from executors. These local properties are automatically propagated from the driver to executors. For streaming, the context for streaming tasks will be the initial driver context when ssc.start() is called. ## How was this patch tested? Unit tests. cc JoshRosen Author: Eric Liang Closes #12248 from ericl/sc-2813. --- .../scala/org/apache/spark/streaming/StreamingContext.scala | 8 ++++++++ .../org/apache/spark/streaming/scheduler/JobGenerator.scala | 5 ----- .../org/apache/spark/streaming/scheduler/JobScheduler.scala | 9 +++++++-- .../org/apache/spark/streaming/StreamingContextSuite.scala | 10 +++++++++- 4 files changed, 24 insertions(+), 8 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index cc187f5cb4..928739a416 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.io.{InputStream, NotSerializableException} +import java.util.Properties import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map @@ -25,6 +26,7 @@ import scala.collection.mutable.Queue import scala.reflect.ClassTag import scala.util.control.NonFatal +import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} @@ -198,6 +200,10 @@ class StreamingContext private[streaming] ( private val startSite = new AtomicReference[CallSite](null) + // Copy of thread-local properties from SparkContext. These properties will be set in all tasks + // submitted by this StreamingContext after start. + private[streaming] val savedProperties = new AtomicReference[Properties](new Properties) + private[streaming] def getStartSite(): CallSite = startSite.get() private var shutdownHookRef: AnyRef = _ @@ -573,6 +579,8 @@ class StreamingContext private[streaming] ( sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") + savedProperties.set(SerializationUtils.clone( + sparkContext.localProperties.get()).asInstanceOf[Properties]) scheduler.start() } state = StreamingContextState.ACTIVE diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 86f069b0bd..307ff1f7ec 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -241,11 +241,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { - // Set the SparkEnv in this thread, so that job generation code can access the environment - // Example: BlockRDDs are created in this thread, and it needs to access BlockManager - // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. - SparkEnv.set(ssc.env) - // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 303c325274..ac18f73ea8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,11 +17,14 @@ package org.apache.spark.streaming.scheduler +import java.util.Properties import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConverters._ import scala.util.Failure +import org.apache.commons.lang.SerializationUtils + import org.apache.spark.internal.Logging import org.apache.spark.rdd.{PairRDDFunctions, RDD} import org.apache.spark.streaming._ @@ -214,7 +217,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { import JobScheduler._ def run() { + val oldProps = ssc.sparkContext.getLocalProperties try { + ssc.sparkContext.setLocalProperties( + SerializationUtils.clone(ssc.savedProperties.get()).asInstanceOf[Properties]) val formattedTime = UIUtils.formatBatchTime( job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false) val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}" @@ -248,8 +254,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // JobScheduler has been stopped. } } finally { - ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) - ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) + ssc.sparkContext.setLocalProperties(oldProps) } } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a80154e2fc..806e181f61 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -182,7 +182,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(ssc.scheduler.isStarted === false) } - test("start should set job group and description of streaming jobs correctly") { + test("start should set local properties of streaming jobs correctly") { ssc = new StreamingContext(conf, batchDuration) ssc.sc.setJobGroup("non-streaming", "non-streaming", true) val sc = ssc.sc @@ -190,16 +190,22 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo @volatile var jobGroupFound: String = "" @volatile var jobDescFound: String = "" @volatile var jobInterruptFound: String = "" + @volatile var customPropFound: String = "" @volatile var allFound: Boolean = false addInputStream(ssc).foreachRDD { rdd => jobGroupFound = sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) jobDescFound = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) jobInterruptFound = sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) + customPropFound = sc.getLocalProperty("customPropKey") allFound = true } + ssc.sc.setLocalProperty("customPropKey", "value1") ssc.start() + // Local props set after start should be ignored + ssc.sc.setLocalProperty("customPropKey", "value2") + eventually(timeout(10 seconds), interval(10 milliseconds)) { assert(allFound === true) } @@ -208,11 +214,13 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo assert(jobGroupFound === null) assert(jobDescFound.contains("Streaming job from")) assert(jobInterruptFound === "false") + assert(customPropFound === "value1") // Verify current thread's thread-local properties have not changed assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "non-streaming") assert(sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) === "non-streaming") assert(sc.getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL) === "true") + assert(sc.getLocalProperty("customPropKey") === "value2") } test("start multiple times") { -- cgit v1.2.3