aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-01-28 12:52:31 -0800
committerAndrew Or <andrew@databricks.com>2015-01-28 12:53:22 -0800
commit84b6ecdef63e6f5710a3f7f95f698b1d1ea44855 (patch)
tree0d164452afd6f8998c119e6c8fd7e188bd612f52 /core/src/main/scala
parent81f8f3406284c391dfad14fb70147fa8e20692a8 (diff)
downloadspark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.tar.gz
spark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.tar.bz2
spark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.zip
[SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues
In DriverSuite, we currently set a timeout of 60 seconds. If after this time the process has not terminated, we leak the process because we never destroy it. In SparkSubmitSuite, we currently do not have a timeout so the test can hang indefinitely. Author: Andrew Or <andrew@databricks.com> Closes #4230 from andrewor14/fix-driver-suite and squashes the following commits: f5c80fd [Andrew Or] Fix timeout behaviors in both suites 8092c36 [Andrew Or] Stop SparkContext after every individual test
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala87
1 files changed, 44 insertions, 43 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c04e4ddfb..86ac307fc8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
- Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+ executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
- Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+ executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
@@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
}
/**
- * Execute a command in the given working directory, throwing an exception if it completes
- * with an exit code other than 0.
+ * Execute a command and return the process running the command.
*/
- def execute(command: Seq[String], workingDir: File) {
- val process = new ProcessBuilder(command: _*)
- .directory(workingDir)
- .redirectErrorStream(true)
- .start()
- new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
- System.err.println(line)
- }
- }
- }.start()
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- throw new SparkException("Process " + command + " exited with code " + exitCode)
+ def executeCommand(
+ command: Seq[String],
+ workingDir: File = new File("."),
+ extraEnvironment: Map[String, String] = Map.empty,
+ redirectStderr: Boolean = true): Process = {
+ val builder = new ProcessBuilder(command: _*).directory(workingDir)
+ val environment = builder.environment()
+ for ((key, value) <- extraEnvironment) {
+ environment.put(key, value)
+ }
+ val process = builder.start()
+ if (redirectStderr) {
+ val threadName = "redirect stderr for command " + command(0)
+ def log(s: String): Unit = logInfo(s)
+ processStreamByLine(threadName, process.getErrorStream, log)
}
+ process
}
/**
@@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
def executeAndGetOutput(
command: Seq[String],
workingDir: File = new File("."),
- extraEnvironment: Map[String, String] = Map.empty): String = {
- val builder = new ProcessBuilder(command: _*)
- .directory(workingDir)
- val environment = builder.environment()
- for ((key, value) <- extraEnvironment) {
- environment.put(key, value)
- }
-
- val process = builder.start()
- new Thread("read stderr for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
- logInfo(line)
- }
- }
- }.start()
+ extraEnvironment: Map[String, String] = Map.empty,
+ redirectStderr: Boolean = true): String = {
+ val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
val output = new StringBuffer
- val stdoutThread = new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
- output.append(line)
- }
- }
- }
- stdoutThread.start()
+ val threadName = "read stdout for " + command(0)
+ def appendToOutput(s: String): Unit = output.append(s)
+ val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
@@ -1018,6 +1000,25 @@ private[spark] object Utils extends Logging {
}
/**
+ * Return and start a daemon thread that processes the content of the input stream line by line.
+ */
+ def processStreamByLine(
+ threadName: String,
+ inputStream: InputStream,
+ processLine: String => Unit): Thread = {
+ val t = new Thread(threadName) {
+ override def run() {
+ for (line <- Source.fromInputStream(inputStream).getLines()) {
+ processLine(line)
+ }
+ }
+ }
+ t.setDaemon(true)
+ t.start()
+ t
+ }
+
+ /**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*/