diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-10-07 12:06:12 -0700 |
---|---|---|
committer | Matei Zaharia <matei@databricks.com> | 2014-10-07 12:06:12 -0700 |
commit | 655032965fc7e2368dff9947fc024ac720ffd19c (patch) | |
tree | bb93392598379e53272b27cd9592d89861e214a0 /streaming | |
parent | 12e2551ea1773ae19559ecdada35d23608e6b0ec (diff) | |
download | spark-655032965fc7e2368dff9947fc024ac720ffd19c.tar.gz spark-655032965fc7e2368dff9947fc024ac720ffd19c.tar.bz2 spark-655032965fc7e2368dff9947fc024ac720ffd19c.zip |
[SPARK-3762] clear reference of SparkEnv after stop
SparkEnv is cached in ThreadLocal object, so after stop and create a new SparkContext, old SparkEnv is still used by some threads, it will trigger many problems, for example, pyspark will have problem after restart SparkContext, because py4j use thread pool for RPC.
This patch will clear all the references after stop a SparkEnv.
cc mateiz tdas pwendell
Author: Davies Liu <davies.liu@gmail.com>
Closes #2624 from davies/env and squashes the following commits:
a69f30c [Davies Liu] deprecate getThreadLocal
ba77ca4 [Davies Liu] remove getThreadLocal(), update docs
ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV
4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
Diffstat (limited to 'streaming')
3 files changed, 0 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 374848358e..7d73ada12d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { - SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => val receivedBlockInfo = graph.getReceiverInputStreams.map { stream => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 1b034b9fb1..cfa3cd8925 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } jobSet.handleJobStart(job) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) - SparkEnv.set(ssc.env) } private def handleJobCompletion(job: Job) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 5307fe189d..7149dbc12a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { @transient val thread = new Thread() { override def run() { try { - SparkEnv.set(env) startReceivers() } catch { case ie: InterruptedException => logInfo("ReceiverLauncher interrupted") |