aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala76
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala2
2 files changed, 47 insertions, 31 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f77cc2f9b7..0c23f3cd35 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
}
/**
- * Terminates a process waiting for at most the specified duration. Returns whether
- * the process terminated.
+ * Terminates a process waiting for at most the specified duration.
+ *
+ * @return the process exit value if it was successfully terminated, else None
*/
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
- try {
- // Java8 added a new API which will more forcibly kill the process. Use that if available.
- val destroyMethod = process.getClass().getMethod("destroyForcibly");
- destroyMethod.setAccessible(true)
- destroyMethod.invoke(process)
- } catch {
- case NonFatal(e) =>
- if (!e.isInstanceOf[NoSuchMethodException]) {
- logWarning("Exception when attempting to kill process", e)
- }
- process.destroy()
- }
+ // Politely destroy first
+ process.destroy()
+
if (waitForProcess(process, timeoutMs)) {
+ // Successful exit
Option(process.exitValue())
} else {
- None
+ // Java 8 added a new API which will more forcibly kill the process. Use that if available.
+ try {
+ classOf[Process].getMethod("destroyForcibly").invoke(process)
+ } catch {
+ case _: NoSuchMethodException => return None // Not available; give up
+ case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
+ }
+ // Wait, again, although this really should return almost immediately
+ if (waitForProcess(process, timeoutMs)) {
+ Option(process.exitValue())
+ } else {
+ logWarning("Timed out waiting to forcibly kill process")
+ None
+ }
}
}
/**
* Wait for a process to terminate for at most the specified duration.
- * Return whether the process actually terminated after the given timeout.
+ *
+ * @return whether the process actually terminated before the given timeout.
*/
def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
- var terminated = false
- val startTime = System.currentTimeMillis
- while (!terminated) {
- try {
- process.exitValue()
- terminated = true
- } catch {
- case e: IllegalThreadStateException =>
- // Process not terminated yet
- if (System.currentTimeMillis - startTime > timeoutMs) {
- return false
+ try {
+ // Use Java 8 method if available
+ classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
+ .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
+ .asInstanceOf[Boolean]
+ } catch {
+ case _: NoSuchMethodError =>
+ // Otherwise implement it manually
+ var terminated = false
+ val startTime = System.currentTimeMillis
+ while (!terminated) {
+ try {
+ process.exitValue()
+ terminated = true
+ } catch {
+ case e: IllegalThreadStateException =>
+ // Process not terminated yet
+ if (System.currentTimeMillis - startTime > timeoutMs) {
+ return false
+ }
+ Thread.sleep(100)
}
- Thread.sleep(100)
- }
+ }
+ true
}
- true
}
/**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index df279b5a37..f5d0fb00b7 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(terminated.isDefined)
Utils.waitForProcess(process, 5000)
val duration = System.currentTimeMillis() - start
- assert(duration < 5000)
+ assert(duration < 6000) // add a little extra time to allow a force kill to finish
assert(!pidExists(pid))
} finally {
signal(pid, "SIGKILL")