aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-10-07 12:06:12 -0700
committerMatei Zaharia <matei@databricks.com>2014-10-07 12:06:12 -0700
commit655032965fc7e2368dff9947fc024ac720ffd19c (patch)
treebb93392598379e53272b27cd9592d89861e214a0 /streaming/src
parent12e2551ea1773ae19559ecdada35d23608e6b0ec (diff)
downloadspark-655032965fc7e2368dff9947fc024ac720ffd19c.tar.gz
spark-655032965fc7e2368dff9947fc024ac720ffd19c.tar.bz2
spark-655032965fc7e2368dff9947fc024ac720ffd19c.zip
[SPARK-3762] clear reference of SparkEnv after stop
SparkEnv is cached in ThreadLocal object, so after stop and create a new SparkContext, old SparkEnv is still used by some threads, it will trigger many problems, for example, pyspark will have problem after restart SparkContext, because py4j use thread pool for RPC. This patch will clear all the references after stop a SparkEnv. cc mateiz tdas pwendell Author: Davies Liu <davies.liu@gmail.com> Closes #2624 from davies/env and squashes the following commits: a69f30c [Davies Liu] deprecate getThreadLocal ba77ca4 [Davies Liu] remove getThreadLocal(), update docs ee62bb7 [Davies Liu] cleanup ThreadLocal of SparnENV 4d0ea8b [Davies Liu] clear reference of SparkEnv after stop
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala1
3 files changed, 0 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
index 374848358e..7d73ada12d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
@@ -217,7 +217,6 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) {
- SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 1b034b9fb1..cfa3cd8925 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -138,7 +138,6 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
jobSet.handleJobStart(job)
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
- SparkEnv.set(ssc.env)
}
private def handleJobCompletion(job: Job) {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 5307fe189d..7149dbc12a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -202,7 +202,6 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
@transient val thread = new Thread() {
override def run() {
try {
- SparkEnv.set(env)
startReceivers()
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")