aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala77
2 files changed, 95 insertions, 10 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 58c8560a3d..86bbaa20f6 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.logging
-import java.io.{File, FileOutputStream, InputStream}
+import java.io.{File, FileOutputStream, InputStream, IOException}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{IntParam, Utils}
@@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
protected def appendStreamToFile() {
try {
logDebug("Started appending thread")
- openFile()
- val buf = new Array[Byte](bufferSize)
- var n = 0
- while (!markedForStop && n != -1) {
- n = inputStream.read(buf)
- if (n != -1) {
- appendToFile(buf, n)
+ Utils.tryWithSafeFinally {
+ openFile()
+ val buf = new Array[Byte](bufferSize)
+ var n = 0
+ while (!markedForStop && n != -1) {
+ try {
+ n = inputStream.read(buf)
+ } catch {
+ // An InputStream can throw IOException during read if the stream is closed
+ // asynchronously, so once appender has been flagged to stop these will be ignored
+ case _: IOException if markedForStop => // do nothing and proceed to stop appending
+ }
+ if (n > 0) {
+ appendToFile(buf, n)
+ }
}
+ } {
+ closeFile()
}
} catch {
case e: Exception =>
logError(s"Error writing stream to file $file", e)
- } finally {
- closeFile()
}
}
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 98d1b28d5a..b367cc8358 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -18,12 +18,17 @@
package org.apache.spark.util
import java.io._
+import java.util.concurrent.CountDownLatch
import scala.collection.mutable.HashSet
import scala.reflect._
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
+import org.apache.log4j.{Appender, Level, Logger}
+import org.apache.log4j.spi.LoggingEvent
+import org.mockito.ArgumentCaptor
+import org.mockito.Mockito.{atLeast, mock, verify}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
@@ -188,6 +193,67 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
testAppenderSelection[FileAppender, Any](rollingStrategy("xyz"))
}
+ test("file appender async close stream abruptly") {
+ // Test FileAppender reaction to closing InputStream using a mock logging appender
+ val mockAppender = mock(classOf[Appender])
+ val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+ // Make sure only logging errors
+ val logger = Logger.getRootLogger
+ logger.setLevel(Level.ERROR)
+ logger.addAppender(mockAppender)
+
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream)
+
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+ appender.awaitTermination()
+
+ // If InputStream was closed without first stopping the appender, an exception will be logged
+ verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture)
+ val loggingEvent = loggingEventCaptor.getValue
+ assert(loggingEvent.getThrowableInformation !== null)
+ assert(loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ }
+
+ test("file appender async close stream gracefully") {
+ // Test FileAppender reaction to closing InputStream using a mock logging appender
+ val mockAppender = mock(classOf[Appender])
+ val loggingEventCaptor = new ArgumentCaptor[LoggingEvent]
+
+ // Make sure only logging errors
+ val logger = Logger.getRootLogger
+ logger.setLevel(Level.ERROR)
+ logger.addAppender(mockAppender)
+
+ val testOutputStream = new PipedOutputStream()
+ val testInputStream = new PipedInputStream(testOutputStream) with LatchedInputStream
+
+ // Close the stream before appender tries to read will cause an IOException
+ testInputStream.close()
+ testOutputStream.close()
+ val appender = FileAppender(testInputStream, testFile, new SparkConf)
+
+ // Stop the appender before an IOException is called during read
+ testInputStream.latchReadStarted.await()
+ appender.stop()
+ testInputStream.latchReadProceed.countDown()
+
+ appender.awaitTermination()
+
+ // Make sure no IOException errors have been logged as a result of appender closing gracefully
+ verify(mockAppender, atLeast(0)).doAppend(loggingEventCaptor.capture)
+ import scala.collection.JavaConverters._
+ loggingEventCaptor.getAllValues.asScala.foreach { loggingEvent =>
+ assert(loggingEvent.getThrowableInformation === null
+ || !loggingEvent.getThrowableInformation.getThrowable.isInstanceOf[IOException])
+ }
+ }
+
/**
* Run the rolling file appender with data and see whether all the data was written correctly
* across rolled over files.
@@ -228,4 +294,15 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging {
file.getName.startsWith(testFile.getName)
}.foreach { _.delete() }
}
+
+ /** Used to synchronize when read is called on a stream */
+ private trait LatchedInputStream extends PipedInputStream {
+ val latchReadStarted = new CountDownLatch(1)
+ val latchReadProceed = new CountDownLatch(1)
+ abstract override def read(): Int = {
+ latchReadStarted.countDown()
+ latchReadProceed.await()
+ super.read()
+ }
+ }
}