aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/Utils.scala21
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala17
-rw-r--r--core/src/main/scala/spark/rdd/PartitionPruningRDD.scala12
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala6
-rw-r--r--docs/_config.yml1
-rw-r--r--docs/contributing-to-spark.md2
6 files changed, 51 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 28d643abca..81daacf958 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -454,4 +454,25 @@ private object Utils extends Logging {
def clone[T](value: T, serializer: SerializerInstance): T = {
serializer.deserialize[T](serializer.serialize(value))
}
+
+ /**
+ * Detect whether this thread might be executing a shutdown hook. Will always return true if
+ * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
+ * if System.exit was just called by a concurrent thread).
+ *
+ * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
+ * an IllegalStateException.
+ */
+ def inShutdown(): Boolean = {
+ try {
+ val hook = new Thread {
+ override def run() {}
+ }
+ Runtime.getRuntime.addShutdownHook(hook)
+ Runtime.getRuntime.removeShutdownHook(hook)
+ } catch {
+ case ise: IllegalStateException => return true
+ }
+ return false
+ }
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index bd21ba719a..5de09030aa 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -50,14 +50,19 @@ private[spark] class Executor extends Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
- if (exception.isInstanceOf[OutOfMemoryError]) {
- System.exit(ExecutorExitCode.OOM)
- } else {
- System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+
+ // We may have been called from a shutdown hook. If so, we must not call System.exit().
+ // (If we do, we will deadlock.)
+ if (!Utils.inShutdown()) {
+ if (exception.isInstanceOf[OutOfMemoryError]) {
+ System.exit(ExecutorExitCode.OOM)
+ } else {
+ System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
+ }
}
} catch {
- case oom: OutOfMemoryError => System.exit(ExecutorExitCode.OOM)
- case t: Throwable => System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
+ case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+ case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
}
}
}
diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
index f2f4fd56d1..41ff62dd22 100644
--- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala
@@ -40,3 +40,15 @@ class PartitionPruningRDD[T: ClassManifest](
override protected def getPartitions: Array[Partition] =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
}
+
+
+object PartitionPruningRDD {
+
+ /**
+ * Create a PartitionPruningRDD. This function can be used to create the PartitionPruningRDD
+ * when its type T is not known at compile time.
+ */
+ def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
+ new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest)
+ }
+}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 7e5b820cbb..ddbf8821ad 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -178,7 +178,11 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
override def run() {
logDebug("Shutdown hook called")
- localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
+ try {
+ localDirs.foreach(localDir => Utils.deleteRecursively(localDir))
+ } catch {
+ case t: Throwable => logError("Exception while deleting local spark dirs", t)
+ }
}
})
}
diff --git a/docs/_config.yml b/docs/_config.yml
index 2bd2eecc86..09617e4a1e 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -7,3 +7,4 @@ SPARK_VERSION: 0.7.0-SNAPSHOT
SPARK_VERSION_SHORT: 0.7.0
SCALA_VERSION: 2.9.2
MESOS_VERSION: 0.9.0-incubating
+SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/contributing-to-spark.md b/docs/contributing-to-spark.md
index 14d0dc856b..50feeb2d6c 100644
--- a/docs/contributing-to-spark.md
+++ b/docs/contributing-to-spark.md
@@ -15,7 +15,7 @@ The Spark team welcomes contributions in the form of GitHub pull requests. Here
But first, make sure that you have [configured a spark-env.sh](configuration.html) with at least
`SCALA_HOME`, as some of the tests try to spawn subprocesses using this.
- Add new unit tests for your code. We use [ScalaTest](http://www.scalatest.org/) for testing. Just add a new Suite in `core/src/test`, or methods to an existing Suite.
-- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issue tracker](https://spark-project.atlassian.net), or email the [mailing list](http://www.spark-project.org/mailing-lists.html).
+- If you'd like to report a bug but don't have time to fix it, you can still post it to our [issue tracker]({{site.SPARK_ISSUE_TRACKER_URL}}), or email the [mailing list](http://www.spark-project.org/mailing-lists.html).
# Licensing of Contributions