aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-01-28 12:52:31 -0800
committerAndrew Or <andrew@databricks.com>2015-01-28 12:53:22 -0800
commit84b6ecdef63e6f5710a3f7f95f698b1d1ea44855 (patch)
tree0d164452afd6f8998c119e6c8fd7e188bd612f52 /core
parent81f8f3406284c391dfad14fb70147fa8e20692a8 (diff)
downloadspark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.tar.gz
spark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.tar.bz2
spark-84b6ecdef63e6f5710a3f7f95f698b1d1ea44855.zip
[SPARK-5437] Fix DriverSuite and SparkSubmitSuite timeout issues
In DriverSuite, we currently set a timeout of 60 seconds. If after this time the process has not terminated, we leak the process because we never destroy it. In SparkSubmitSuite, we currently do not have a timeout so the test can hang indefinitely. Author: Andrew Or <andrew@databricks.com> Closes #4230 from andrewor14/fix-driver-suite and squashes the following commits: f5c80fd [Andrew Or] Fix timeout behaviors in both suites 8092c36 [Andrew Or] Stop SparkContext after every individual test
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala87
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala26
3 files changed, 71 insertions, 68 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 2c04e4ddfb..86ac307fc8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -410,10 +410,10 @@ private[spark] object Utils extends Logging {
// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
- Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+ executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
- Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+ executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
@@ -956,25 +956,25 @@ private[spark] object Utils extends Logging {
}
/**
- * Execute a command in the given working directory, throwing an exception if it completes
- * with an exit code other than 0.
+ * Execute a command and return the process running the command.
*/
- def execute(command: Seq[String], workingDir: File) {
- val process = new ProcessBuilder(command: _*)
- .directory(workingDir)
- .redirectErrorStream(true)
- .start()
- new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
- System.err.println(line)
- }
- }
- }.start()
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- throw new SparkException("Process " + command + " exited with code " + exitCode)
+ def executeCommand(
+ command: Seq[String],
+ workingDir: File = new File("."),
+ extraEnvironment: Map[String, String] = Map.empty,
+ redirectStderr: Boolean = true): Process = {
+ val builder = new ProcessBuilder(command: _*).directory(workingDir)
+ val environment = builder.environment()
+ for ((key, value) <- extraEnvironment) {
+ environment.put(key, value)
+ }
+ val process = builder.start()
+ if (redirectStderr) {
+ val threadName = "redirect stderr for command " + command(0)
+ def log(s: String): Unit = logInfo(s)
+ processStreamByLine(threadName, process.getErrorStream, log)
}
+ process
}
/**
@@ -983,31 +983,13 @@ private[spark] object Utils extends Logging {
def executeAndGetOutput(
command: Seq[String],
workingDir: File = new File("."),
- extraEnvironment: Map[String, String] = Map.empty): String = {
- val builder = new ProcessBuilder(command: _*)
- .directory(workingDir)
- val environment = builder.environment()
- for ((key, value) <- extraEnvironment) {
- environment.put(key, value)
- }
-
- val process = builder.start()
- new Thread("read stderr for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getErrorStream).getLines()) {
- logInfo(line)
- }
- }
- }.start()
+ extraEnvironment: Map[String, String] = Map.empty,
+ redirectStderr: Boolean = true): String = {
+ val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
val output = new StringBuffer
- val stdoutThread = new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines()) {
- output.append(line)
- }
- }
- }
- stdoutThread.start()
+ val threadName = "read stdout for " + command(0)
+ def appendToOutput(s: String): Unit = output.append(s)
+ val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
if (exitCode != 0) {
@@ -1018,6 +1000,25 @@ private[spark] object Utils extends Logging {
}
/**
+ * Return and start a daemon thread that processes the content of the input stream line by line.
+ */
+ def processStreamByLine(
+ threadName: String,
+ inputStream: InputStream,
+ processLine: String => Unit): Thread = {
+ val t = new Thread(threadName) {
+ override def run() {
+ for (line <- Source.fromInputStream(inputStream).getLines()) {
+ processLine(line)
+ }
+ }
+ }
+ t.setDaemon(true)
+ t.start()
+ t
+ }
+
+ /**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*/
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 8a54360e81..9bd5dfec87 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -28,31 +28,29 @@ import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts {
- test("driver should exit after finishing") {
+ test("driver should exit after finishing without cleanup (SPARK-530)") {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
- val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
+ val masters = Table("master", "local", "local-cluster[2,1,512]")
forAll(masters) { (master: String) =>
- failAfter(60 seconds) {
- Utils.executeAndGetOutput(
- Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
- new File(sparkHome),
- Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
- }
+ val process = Utils.executeCommand(
+ Seq(s"$sparkHome/bin/spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+ new File(sparkHome),
+ Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+ failAfter(60 seconds) { process.waitFor() }
+ // Ensure we still kill the process in case it timed out
+ process.destroy()
}
}
}
/**
- * Program that creates a Spark driver but doesn't call SparkContext.stop() or
- * Sys.exit() after finishing.
+ * Program that creates a Spark driver but doesn't call SparkContext#stop() or
+ * sys.exit() after finishing.
*/
object DriverWithoutCleanup {
def main(args: Array[String]) {
Utils.configTestLog4j("INFO")
- // Bind the web UI to an ephemeral port in order to avoid conflicts with other tests running on
- // the same machine (we shouldn't just disable the UI here, since that might mask bugs):
- val conf = new SparkConf().set("spark.ui.port", "0")
+ val conf = new SparkConf
val sc = new SparkContext(args(0), "DriverWithoutCleanup", conf)
sc.parallelize(1 to 100, 4).count()
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 065b7534ce..82628ad3ab 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -21,25 +21,28 @@ import java.io._
import scala.collection.mutable.ArrayBuffer
+import org.scalatest.FunSuite
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.time.SpanSugar._
+
import org.apache.spark._
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.{ResetSystemProperties, Utils}
-import org.scalatest.FunSuite
-import org.scalatest.Matchers
// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that neeed to be cleared after tests.
-class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties {
+class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties with Timeouts {
def beforeAll() {
System.setProperty("spark.testing", "true")
}
- val noOpOutputStream = new OutputStream {
+ private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
}
/** Simple PrintStream that reads data into a buffer */
- class BufferPrintStream extends PrintStream(noOpOutputStream) {
+ private class BufferPrintStream extends PrintStream(noOpOutputStream) {
var lineBuffer = ArrayBuffer[String]()
override def println(line: String) {
lineBuffer += line
@@ -47,7 +50,7 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
}
/** Returns true if the script exits and the given search string is printed. */
- def testPrematureExit(input: Array[String], searchString: String) = {
+ private def testPrematureExit(input: Array[String], searchString: String) = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream
@@ -290,7 +293,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"),
"--name", "testApp",
"--master", "local",
- "--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
@@ -305,7 +307,6 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
"--name", "testApp",
"--master", "local-cluster[2,1,512]",
"--jars", jarsString,
- "--conf", "spark.ui.enabled=false",
unusedJar.toString)
runSparkSubmit(args)
}
@@ -430,15 +431,18 @@ class SparkSubmitSuite extends FunSuite with Matchers with ResetSystemProperties
}
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
- def runSparkSubmit(args: Seq[String]): String = {
+ private def runSparkSubmit(args: Seq[String]): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
- Utils.executeAndGetOutput(
+ val process = Utils.executeCommand(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
+ failAfter(60 seconds) { process.waitFor() }
+ // Ensure we still kill the process in case it timed out
+ process.destroy()
}
- def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
+ private def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
val tmpDir = Utils.createTempDir()
val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")