aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-06 14:39:36 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-06 14:39:36 -0700
commit346209097e88fe79015359e40b49c32cc0bdc439 (patch)
tree4cc014ace8318d6c822e749e5999422ee6556ce1 /streaming
parent1723e34893f9b087727ea0e5c8b335645f42c295 (diff)
downloadspark-346209097e88fe79015359e40b49c32cc0bdc439.tar.gz
spark-346209097e88fe79015359e40b49c32cc0bdc439.tar.bz2
spark-346209097e88fe79015359e40b49c32cc0bdc439.zip
[SPARK-9639] [STREAMING] Fix a potential NPE in Streaming JobScheduler
Because `JobScheduler.stop(false)` may set `eventLoop` to null when `JobHandler` is running, then it's possible that when `post` is called, `eventLoop` happens to null. This PR fixed this bug and also set threads in `jobExecutor` to `daemon`. Author: zsxwing <zsxwing@gmail.com> Closes #7960 from zsxwing/fix-npe and squashes the following commits: b0864c4 [zsxwing] Fix a potential NPE in Streaming JobScheduler
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala32
1 files changed, 22 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 7e735562dc..6d4cdc4aa6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Executors}
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConversions._
import scala.util.{Failure, Success}
@@ -25,7 +25,7 @@ import scala.util.{Failure, Success}
import org.apache.spark.Logging
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming._
-import org.apache.spark.util.EventLoop
+import org.apache.spark.util.{EventLoop, ThreadUtils}
private[scheduler] sealed trait JobSchedulerEvent
@@ -44,7 +44,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// https://gist.github.com/AlainODea/1375759b8720a3f9f094
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
- private val jobExecutor = Executors.newFixedThreadPool(numConcurrentJobs)
+ private val jobExecutor =
+ ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
private val jobGenerator = new JobGenerator(this)
val clock = jobGenerator.clock
val listenerBus = new StreamingListenerBus()
@@ -193,14 +194,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
try {
- eventLoop.post(JobStarted(job))
- // Disable checks for existing output directories in jobs launched by the streaming
- // scheduler, since we may need to write output to an existing directory during checkpoint
- // recovery; see SPARK-4835 for more details.
- PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
- job.run()
+ // We need to assign `eventLoop` to a temp variable. Otherwise, because
+ // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
+ // it's possible that when `post` is called, `eventLoop` happens to null.
+ var _eventLoop = eventLoop
+ if (_eventLoop != null) {
+ _eventLoop.post(JobStarted(job))
+ // Disable checks for existing output directories in jobs launched by the streaming
+ // scheduler, since we may need to write output to an existing directory during checkpoint
+ // recovery; see SPARK-4835 for more details.
+ PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
+ job.run()
+ }
+ _eventLoop = eventLoop
+ if (_eventLoop != null) {
+ _eventLoop.post(JobCompleted(job))
+ }
+ } else {
+ // JobScheduler has been stopped.
}
- eventLoop.post(JobCompleted(job))
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)