diff options
author | zsxwing <zsxwing@gmail.com> | 2015-08-06 14:39:36 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-08-06 14:39:36 -0700 |
commit | 346209097e88fe79015359e40b49c32cc0bdc439 (patch) | |
tree | 4cc014ace8318d6c822e749e5999422ee6556ce1 /streaming | |
parent | 1723e34893f9b087727ea0e5c8b335645f42c295 (diff) | |
download | spark-346209097e88fe79015359e40b49c32cc0bdc439.tar.gz spark-346209097e88fe79015359e40b49c32cc0bdc439.tar.bz2 spark-346209097e88fe79015359e40b49c32cc0bdc439.zip |
[SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler
Because `JobScheduler.stop(false)` may set `eventLoop` to null when `JobHandler` is running, then it's possible that when `post` is called, `eventLoop` happens to null.
This PR fixed this bug and also set threads in `jobExecutor` to `daemon`.
Author: zsxwing <zsxwing@gmail.com>
Closes #7960 from zsxwing/fix-npe and squashes the following commits:
b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala | 32 |
1 files changed, 22 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 7e735562dc..6d4cdc4aa6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.scheduler -import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConversions._ import scala.util.{Failure, Success} @@ -25,7 +25,7 @@ import scala.util.{Failure, Success} import org.apache.spark.Logging import org.apache.spark.rdd.PairRDDFunctions import org.apache.spark.streaming._ -import org.apache.spark.util.EventLoop +import org.apache.spark.util.{EventLoop, ThreadUtils} private[scheduler] sealed trait JobSchedulerEvent @@ -44,7 +44,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) - private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs) + private val jobExecutor = + ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() @@ -193,14 +194,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString) try { - eventLoop.post(JobStarted(job)) - // Disable checks for existing output directories in jobs launched by the streaming - // scheduler, since we may need to write output to an existing directory during checkpoint - // recovery; see SPARK-4835 for more details. - PairRDDFunctions.disableOutputSpecValidation.withValue(true) { - job.run() + // We need to assign `eventLoop` to a temp variable. Otherwise, because + // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then + // it's possible that when `post` is called, `eventLoop` happens to null. + var _eventLoop = eventLoop + if (_eventLoop != null) { + _eventLoop.post(JobStarted(job)) + // Disable checks for existing output directories in jobs launched by the streaming + // scheduler, since we may need to write output to an existing directory during checkpoint + // recovery; see SPARK-4835 for more details. + PairRDDFunctions.disableOutputSpecValidation.withValue(true) { + job.run() + } + _eventLoop = eventLoop + if (_eventLoop != null) { + _eventLoop.post(JobCompleted(job)) + } + } else { + // JobScheduler has been stopped. } - eventLoop.post(JobCompleted(job)) } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) |