aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-27 15:37:33 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-27 15:37:33 -0800
commit0bc0a60d3001dd231e13057a838d4b6550e5a2b9 (patch)
tree4caa9a8d434d32b1e17e83921a6dd5fb54db3165 /core
parent7c33f76291eda2144fe16e48e261cb62cdadb7f6 (diff)
downloadspark-0bc0a60d3001dd231e13057a838d4b6550e5a2b9.tar.gz
spark-0bc0a60d3001dd231e13057a838d4b6550e5a2b9.tar.bz2
spark-0bc0a60d3001dd231e13057a838d4b6550e5a2b9.zip
Modifications to make sure LocalScheduler terminate cleanly without errors when SparkContext is shutdown, to minimize spurious exception during master failure tests.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala8
-rw-r--r--core/src/test/resources/log4j.properties2
-rw-r--r--core/src/test/scala/spark/ClosureCleanerSuite.scala2
4 files changed, 21 insertions, 13 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index caa9a1794b..0c8b0078a3 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -488,17 +488,19 @@ class SparkContext(
if (dagScheduler != null) {
dagScheduler.stop()
dagScheduler = null
+ taskScheduler = null
+ // TODO: Cache.stop()?
+ env.stop()
+ // Clean up locally linked files
+ clearFiles()
+ clearJars()
+ SparkEnv.set(null)
+ ShuffleMapTask.clearCache()
+ ResultTask.clearCache()
+ logInfo("Successfully stopped SparkContext")
+ } else {
+ logInfo("SparkContext already stopped")
}
- taskScheduler = null
- // TODO: Cache.stop()?
- env.stop()
- // Clean up locally linked files
- clearFiles()
- clearJars()
- SparkEnv.set(null)
- ShuffleMapTask.clearCache()
- ResultTask.clearCache()
- logInfo("Successfully stopped SparkContext")
}
/**
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index eb20fe41b2..17a0a4b103 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -81,7 +81,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
logInfo("Finished task " + idInJob)
- listener.taskEnded(task, Success, resultToReturn, accumUpdates)
+
+ // If the threadpool has not already been shutdown, notify DAGScheduler
+ if (!Thread.currentThread().isInterrupted)
+ listener.taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
@@ -91,7 +94,8 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
- listener.taskEnded(task, new ExceptionFailure(t), null, null)
+ if (!Thread.currentThread().isInterrupted)
+ listener.taskEnded(task, new ExceptionFailure(t), null, null)
}
}
}
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 4c99e450bc..5ed388e91b 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -1,4 +1,4 @@
-# Set everything to be logged to the console
+# Set everything to be logged to the file spark-tests.log
log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala
index 7c0334d957..dfa2de80e6 100644
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/spark/ClosureCleanerSuite.scala
@@ -47,6 +47,8 @@ object TestObject {
val nums = sc.parallelize(Array(1, 2, 3, 4))
val answer = nums.map(_ + x).reduce(_ + _)
sc.stop()
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port")
return answer
}
}