aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHung Lin <hung.lin@gmail.com>2015-04-02 14:01:43 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-02 14:16:45 -0700
commit8fa09a480848faf4eda263cc1e79e0dd56a52605 (patch)
tree345a33da40f65d607b79c9082a750d1b130701dc
parenta73055f7f9104cb5a9ed43a6cd4a82d463702b60 (diff)
downloadspark-8fa09a480848faf4eda263cc1e79e0dd56a52605.tar.gz
spark-8fa09a480848faf4eda263cc1e79e0dd56a52605.tar.bz2
spark-8fa09a480848faf4eda263cc1e79e0dd56a52605.zip
SPARK-6414: Spark driver failed with NPE on job cancelation
Use Option for ActiveJob.properties to avoid NPE bug Author: Hung Lin <hung.lin@gmail.com> Closes #5124 from hunglin/SPARK-6414 and squashes the following commits: 2290b6b [Hung Lin] [SPARK-6414][core] Fix NPE in SparkContext.cancelJobGroup() (cherry picked from commit e3202aa2e9bd140effbcf2a7a02b90cb077e760b) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala Conflicts: core/src/test/scala/org/apache/spark/SparkContextSuite.scala
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala27
3 files changed, 33 insertions, 13 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 98e2fed799..e9b41158c7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -400,6 +400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Thread Local variable that can be used by users to pass information down the stack
private val localProperties = new InheritableThreadLocal[Properties] {
override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ override protected def initialValue(): Properties = new Properties()
}
/**
@@ -441,9 +442,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Spark fair scheduler pool.
*/
def setLocalProperty(key: String, value: String) {
- if (localProperties.get() == null) {
- localProperties.set(new Properties())
- }
if (value == null) {
localProperties.get.remove(key)
} else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index cb8ccfbdbd..e6f2c79334 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -477,8 +477,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null): JobWaiter[U] =
- {
+ properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -507,8 +506,7 @@ class DAGScheduler(
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
- properties: Properties = null)
- {
+ properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
@@ -529,9 +527,7 @@ class DAGScheduler(
evaluator: ApproximateEvaluator[U, R],
callSite: CallSite,
timeout: Long,
- properties: Properties = null)
- : PartialResult[R] =
- {
+ properties: Properties): PartialResult[R] = {
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
@@ -678,7 +674,7 @@ class DAGScheduler(
// Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(activeJob =>
- groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+ Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
@@ -725,8 +721,7 @@ class DAGScheduler(
allowLocal: Boolean,
callSite: CallSite,
listener: JobListener,
- properties: Properties = null)
- {
+ properties: Properties) {
var finalStage: Stage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 58ecb06df4..99807b1042 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -17,10 +17,22 @@
package org.apache.spark
+import java.io.File
+import java.util.concurrent.TimeUnit
+
+import com.google.common.base.Charsets._
+import com.google.common.io.Files
+
import org.scalatest.FunSuite
import org.apache.hadoop.io.BytesWritable
+import org.apache.spark.util.Utils
+import org.apache.spark.SparkContext._
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
class SparkContextSuite extends FunSuite with LocalSparkContext {
test("Only one SparkContext may be active at a time") {
@@ -72,4 +84,19 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
val byteArray2 = converter.convert(bytesWritable)
assert(byteArray2.length === 0)
}
+
+ test("Cancelling job group should not cause SparkContext to shutdown (SPARK-6414)") {
+ try {
+ sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+ val future = sc.parallelize(Seq(0)).foreachAsync(_ => {Thread.sleep(1000L)})
+ sc.cancelJobGroup("nonExistGroupId")
+ Await.ready(future, Duration(2, TimeUnit.SECONDS))
+
+ // In SPARK-6414, sc.cancelJobGroup will cause NullPointerException and cause
+ // SparkContext to shutdown, so the following assertion will fail.
+ assert(sc.parallelize(1 to 10).count() == 10L)
+ } finally {
+ sc.stop()
+ }
+ }
}