From 84b6ecdef63e6f5710a3f7f95f698b1d1ea44855 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 28 Jan 2015 12:52:31 -0800 Subject: [SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues In DriverSuite, we currently set a timeout of 60 seconds. If after this time the process has not terminated, we leak the process because we never destroy it. In SparkSubmitSuite, we currently do not have a timeout so the test can hang indefinitely. Author: Andrew Or Closes #4230 from andrewor14/fix-driver-suite and squashes the following commits: f5c80fd [Andrew Or] Fix timeout behaviors in both suites 8092c36 [Andrew Or] Stop SparkContext after every individual test --- .../main/scala/org/apache/spark/util/Utils.scala | 87 +++++++++++----------- .../test/scala/org/apache/spark/DriverSuite.scala | 26 +++---- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 26 ++++--- 3 files changed, 71 insertions(+), 68 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2c04e4ddfb..86ac307fc8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -410,10 +410,10 @@ private[spark] object Utils extends Logging { // Decompress the file if it's a .tar or .tar.gz if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { logInfo("Untarring " + fileName) - Utils.execute(Seq("tar", "-xzf", fileName), targetDir) + executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir) } else if (fileName.endsWith(".tar")) { logInfo("Untarring " + fileName) - Utils.execute(Seq("tar", "-xf", fileName), targetDir) + executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir) } // Make the file executable - That's necessary for scripts FileUtil.chmod(targetFile.getAbsolutePath, "a+x") @@ -956,25 +956,25 @@ private[spark] object Utils extends Logging { } /** - * Execute a command in the given working directory, throwing an exception if it completes - * with an exit code other than 0. + * Execute a command and return the process running the command. */ - def execute(command: Seq[String], workingDir: File) { - val process = new ProcessBuilder(command: _*) - .directory(workingDir) - .redirectErrorStream(true) - .start() - new Thread("read stdout for " + command(0)) { - override def run() { - for (line <- Source.fromInputStream(process.getInputStream).getLines()) { - System.err.println(line) - } - } - }.start() - val exitCode = process.waitFor() - if (exitCode != 0) { - throw new SparkException("Process " + command + " exited with code " + exitCode) + def executeCommand( + command: Seq[String], + workingDir: File = new File("."), + extraEnvironment: Map[String, String] = Map.empty, + redirectStderr: Boolean = true): Process = { + val builder = new ProcessBuilder(command: _*).directory(workingDir) + val environment = builder.environment() + for ((key, value) <- extraEnvironment) { + environment.put(key, value) + } + val process = builder.start() + if (redirectStderr) { + val threadName = "redirect stderr for command " + command(0) + def log(s: String): Unit = logInfo(s) + processStreamByLine(threadName, process.getErrorStream, log) } + process } /** @@ -983,31 +983,13 @@ private[spark] object Utils extends Logging { def executeAndGetOutput( command: Seq[String], workingDir: File = new File("."), - extraEnvironment: Map[String, String] = Map.empty): String = { - val builder = new ProcessBuilder(command: _*) - .directory(workingDir) - val environment = builder.environment() - for ((key, value) <- extraEnvironment) { - environment.put(key, value) - } - - val process = builder.start() - new Thread("read stderr for " + command(0)) { - override def run() { - for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { - logInfo(line) - } - } - }.start() + extraEnvironment: Map[String, String] = Map.empty, + redirectStderr: Boolean = true): String = { + val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr) val output = new StringBuffer - val stdoutThread = new Thread("read stdout for " + command(0)) { - override def run() { - for (line <- Source.fromInputStream(process.getInputStream).getLines()) { - output.append(line) - } - } - } - stdoutThread.start() + val threadName = "read stdout for " + command(0) + def appendToOutput(s: String): Unit = output.append(s) + val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput) val exitCode = process.waitFor() stdoutThread.join() // Wait for it to finish reading output if (exitCode != 0) { @@ -1017,6 +999,25 @@ private[spark] object Utils extends Logging { output.toString } + /** + * Return and start a daemon thread that processes the content of the input stream line by line. + */ + def processStreamByLine( + threadName: String, + inputStream: InputStream, + processLine: String => Unit): Thread = { + val t = new Thread(threadName) { + override def run() { + for (line <- Source.fromInputStream(inputStream).getLines()) { + processLine(line) + } + } + } + t.setDaemon(true) + t.start() + t + } + /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala index 8a54360e81..9bd5dfec87 100644 --- a/core/src/test/scala/org/apache/spark/DriverSuite.scala +++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala @@ -28,31 +28,29 @@ import org.apache.spark.util.Utils class DriverSuite extends FunSuite with Timeouts { - test("driver should exit after finishing") { + test("driver should exit after finishing without cleanup (SPARK-530)") { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" - val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) + val masters = Table("master", "local", "local-cluster[2,1,512]") forAll(masters) { (master: String) => - failAfter(60 seconds) { - Utils.executeAndGetOutput( - Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), - new File(sparkHome), - Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) - } + val process = Utils.executeCommand( + Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master), + new File(sparkHome), + Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) + failAfter(60 seconds) { process.waitFor() } + // Ensure we still kill the process in case it timed out + process.destroy() } } } /** - * Program that creates a Spark driver but doesn't call SparkContext.stop() or - * Sys.exit() after finishing. + * Program that creates a Spark driver but doesn't call SparkContext#stop() or + * sys.exit() after finishing. */ object DriverWithoutCleanup { def main(args: Array[String]) { Utils.configTestLog4j("INFO") - // Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on - // the same machine (we shouldn't just disable the UI here, since that might mask bugs): - val conf = new SparkConf().set("spark.ui.port", "0") + val conf = new SparkConf val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf) sc.parallelize(1 to 100, 4).count() } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 065b7534ce..82628ad3ab 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -21,25 +21,28 @@ import java.io._ import scala.collection.mutable.ArrayBuffer +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ + import org.apache.spark._ import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.util.{ResetSystemProperties, Utils} -import org.scalatest.FunSuite -import org.scalatest.Matchers // Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch // of properties that neeed to be cleared after tests. -class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties { +class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts { def beforeAll() { System.setProperty("spark.testing", "true") } - val noOpOutputStream = new OutputStream { + private val noOpOutputStream = new OutputStream { def write(b: Int) = {} } /** Simple PrintStream that reads data into a buffer */ - class BufferPrintStream extends PrintStream(noOpOutputStream) { + private class BufferPrintStream extends PrintStream(noOpOutputStream) { var lineBuffer = ArrayBuffer[String]() override def println(line: String) { lineBuffer += line @@ -47,7 +50,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties } /** Returns true if the script exits and the given search string is printed. */ - def testPrematureExit(input: Array[String], searchString: String) = { + private def testPrematureExit(input: Array[String], searchString: String) = { val printStream = new BufferPrintStream() SparkSubmit.printStream = printStream @@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), "--name", "testApp", "--master", "local", - "--conf", "spark.ui.enabled=false", unusedJar.toString) runSparkSubmit(args) } @@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties "--name", "testApp", "--master", "local-cluster[2,1,512]", "--jars", jarsString, - "--conf", "spark.ui.enabled=false", unusedJar.toString) runSparkSubmit(args) } @@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties } // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. - def runSparkSubmit(args: Seq[String]): String = { + private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - Utils.executeAndGetOutput( + val process = Utils.executeCommand( Seq("./bin/spark-submit") ++ args, new File(sparkHome), Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) + failAfter(60 seconds) { process.waitFor() } + // Ensure we still kill the process in case it timed out + process.destroy() } - def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { + private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { val tmpDir = Utils.createTempDir() val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") -- cgit v1.2.3