aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala72
2 files changed, 43 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 50bcf85805..c304629bcd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1121,9 +1121,9 @@ private[spark] object Utils extends Logging {
extraEnvironment: Map[String, String] = Map.empty,
redirectStderr: Boolean = true): String = {
val process = executeCommand(command, workingDir, extraEnvironment, redirectStderr)
- val output = new StringBuffer
+ val output = new StringBuilder
val threadName = "read stdout for " + command(0)
- def appendToOutput(s: String): Unit = output.append(s)
+ def appendToOutput(s: String): Unit = output.append(s).append("\n")
val stdoutThread = processStreamByLine(threadName, process.getInputStream, appendToOutput)
val exitCode = process.waitFor()
stdoutThread.join() // Wait for it to finish reading output
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 280e496498..4fa9f9a8f5 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -201,24 +201,29 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// Make sure only logging errors
val logger = Logger.getRootLogger
+ val oldLogLevel = logger.getLevel
logger.setLevel(Level.ERROR)
- logger.addAppender(mockAppender)
+ try {
+ logger.addAppender(mockAppender)
- val testOutputStream = new PipedOutputStream()
- val testInputStream = new PipedInputStream(testOutputStream)
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream)
- // Close the stream before appender tries to read will cause an IOException
- testInputStream.close()
- testOutputStream.close()
- val appender = FileAppender(testInputStream, testFile, new SparkConf)
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
- appender.awaitTermination()
+ appender.awaitTermination()
- // If InputStream was closed without first stopping the appender, an exception will be logged
- verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
- val loggingEvent = loggingEventCaptor.getValue
- assert(loggingEvent.getThrowableInformation !== null)
- assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ // If InputStream was closed without first stopping the appender, an exception will be logged
+ verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
+ val loggingEvent = loggingEventCaptor.getValue
+ assert(loggingEvent.getThrowableInformation !== null)
+ assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ } finally {
+ logger.setLevel(oldLogLevel)
+ }
}
test("file appender async close stream gracefully") {
@@ -228,30 +233,35 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
// Make sure only logging errors
val logger = Logger.getRootLogger
+ val oldLogLevel = logger.getLevel
logger.setLevel(Level.ERROR)
- logger.addAppender(mockAppender)
+ try {
+ logger.addAppender(mockAppender)
- val testOutputStream = new PipedOutputStream()
- val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
- // Close the stream before appender tries to read will cause an IOException
- testInputStream.close()
- testOutputStream.close()
- val appender = FileAppender(testInputStream, testFile, new SparkConf)
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
- // Stop the appender before an IOException is called during read
- testInputStream.latchReadStarted.await()
- appender.stop()
- testInputStream.latchReadProceed.countDown()
+ // Stop the appender before an IOException is called during read
+ testInputStream.latchReadStarted.await()
+ appender.stop()
+ testInputStream.latchReadProceed.countDown()
- appender.awaitTermination()
+ appender.awaitTermination()
- // Make sure no IOException errors have been logged as a result of appender closing gracefully
- verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
- import scala.collection.JavaConverters._
- loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
- assert(loggingEvent.getThrowableInformation === null
- || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ // Make sure no IOException errors have been logged as a result of appender closing gracefully
+ verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
+ import scala.collection.JavaConverters._
+ loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
+ assert(loggingEvent.getThrowableInformation === null
+ || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ }
+ } finally {
+ logger.setLevel(oldLogLevel)
}
}