aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHung Lin <hung.lin@gmail.com>2015-04-02 14:01:43 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-02 14:08:16 -0700
commit58e2b3fcdd3bd2119916c55215303fbabbc9533e (patch)
tree38405935173863022d0341690064c8c2ec9efa99
parenta6664dcd88a0bdaa8985844cd485d3c4a71eba1b (diff)
downloadspark-58e2b3fcdd3bd2119916c55215303fbabbc9533e.tar.gz
spark-58e2b3fcdd3bd2119916c55215303fbabbc9533e.tar.bz2
spark-58e2b3fcdd3bd2119916c55215303fbabbc9533e.zip
SPARK-6414: Spark driver failed with NPE on job cancelation
Use Option for ActiveJob.properties to avoid NPE bug Author: Hung Lin <hung.lin@gmail.com> Closes #5124 from hunglin/SPARK-6414 and squashes the following commits: 2290b6b [Hung Lin] [SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup() (cherry picked from commit e3202aa2e9bd140effbcf2a7a02b90cb077e760b) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala20
3 files changed, 25 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dfa1d399a1..7c2de9d260 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -433,6 +433,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ override protected def initialValue(): Properties = new Properties()
}
/**
@@ -474,9 +475,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Spark fair scheduler pool.
*/
def setLocalProperty(key: String, value: String) {
- if (localProperties.get() == null) {
- localProperties.set(new Properties())
- }
if (value == null) {
localProperties.get.remove(key)
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c58721c2c8..c10873e495 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -474,8 +474,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null): JobWaiter[U] =
- {
+ properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -504,8 +503,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null)
- {
+ properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
@@ -526,9 +524,7 @@ class DAGScheduler(
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
- properties: Properties = null)
- : PartialResult[R] =
- {
+ properties: Properties): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
@@ -675,7 +671,7 @@ class DAGScheduler(
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
- groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+ Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
@@ -722,8 +718,7 @@ class DAGScheduler(
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
- properties: Properties = null)
- {
+ properties: Properties) {
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index b8e3e83b5a..26d10195d4 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io.File
+import java.util.concurrent.TimeUnit
import com.google.common.base.Charsets._
import com.google.common.io.Files
@@ -25,9 +26,11 @@ import com.google.common.io.Files
import org.scalatest.FunSuite
import org.apache.hadoop.io.BytesWritable
-
import org.apache.spark.util.Utils
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
class SparkContextSuite extends FunSuite with LocalSparkContext {
test("Only one SparkContext may be active at a time") {
@@ -172,4 +175,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
sc.stop()
}
}
+
+ test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
+ sc.cancelJobGroup("nonExistGroupId")
+ Await.ready(future, Duration(2, TimeUnit.SECONDS))
+
+ // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
+ // SparkContext to shutdown, so the following assertion will fail.
+ assert(sc.parallelize(1 to 10).count() == 10L)
+ } finally {
+ sc.stop()
+ }
+ }
}