aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala26
12 files changed, 118 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index c9426c5de2..5718951451 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
- // Since statuses can be modified in parallel, sync on it
- statuses.synchronized {
- objOut.writeObject(statuses)
+ Utils.tryWithSafeFinally {
+ // Since statuses can be modified in parallel, sync on it
+ statuses.synchronized {
+ objOut.writeObject(statuses)
+ }
+ } {
+ objOut.close()
}
- objOut.close()
out.toByteArray
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 36cf2af085..b1ffba4c54 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -614,9 +614,9 @@ private[spark] object PythonRDD extends Logging {
try {
val sock = serverSocket.accept()
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
- try {
+ Utils.tryWithSafeFinally {
writeIteratorToStream(items, out)
- } finally {
+ } {
out.close()
}
} catch {
@@ -862,9 +862,9 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = new FileOutputStream(file)
- try {
+ Utils.tryWithSafeFinally {
Utils.copyStream(in, out)
- } finally {
+ } {
out.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 74ccfa6d3c..4457c75e8b 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -165,7 +165,7 @@ private[broadcast] object HttpBroadcast extends Logging {
private def write(id: Long, value: Any) {
val file = getFile(id)
val fileOutputStream = new FileOutputStream(file)
- try {
+ Utils.tryWithSafeFinally {
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(fileOutputStream)
@@ -175,10 +175,13 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
- serOut.writeObject(value)
- serOut.close()
+ Utils.tryWithSafeFinally {
+ serOut.writeObject(value)
+ } {
+ serOut.close()
+ }
files += file
- } finally {
+ } {
fileOutputStream.close()
}
}
@@ -212,9 +215,11 @@ private[broadcast] object HttpBroadcast extends Logging {
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
- val obj = serIn.readObject[T]()
- serIn.close()
- obj
+ Utils.tryWithSafeFinally {
+ serIn.readObject[T]()
+ } {
+ serIn.close()
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 32499b3a78..f459ed5b3a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import akka.serialization.Serialization
import org.apache.spark.Logging
+import org.apache.spark.util.Utils
/**
@@ -59,9 +60,9 @@ private[master] class FileSystemPersistenceEngine(
val serializer = serialization.findSerializerFor(value)
val serialized = serializer.toBinary(value)
val out = new FileOutputStream(file)
- try {
+ Utils.tryWithSafeFinally {
out.write(serialized)
- } finally {
+ } {
out.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
index 420442f756..a3539e44bd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException
import com.google.common.base.Charsets
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
+import org.apache.spark.util.Utils
/**
* A client that submits applications to the standalone Master using a REST protocol.
@@ -148,8 +149,11 @@ private[deploy] class StandaloneRestClient extends Logging {
conn.setRequestProperty("charset", "utf-8")
conn.setDoOutput(true)
val out = new DataOutputStream(conn.getOutputStream)
- out.write(json.getBytes(Charsets.UTF_8))
- out.close()
+ Utils.tryWithSafeFinally {
+ out.write(json.getBytes(Charsets.UTF_8))
+ } {
+ out.close()
+ }
readResponse(conn)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 1c13e2c372..760c0fa3ac 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.util.Utils
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -112,8 +113,11 @@ private[spark] object CheckpointRDD extends Logging {
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
- serializeStream.writeAll(iterator)
- serializeStream.close()
+ Utils.tryWithSafeFinally {
+ serializeStream.writeAll(iterator)
+ } {
+ serializeStream.close()
+ }
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 6b4f097ea9..bf1303d395 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -995,7 +995,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
require(writer != null, "Unable to obtain RecordWriter")
var recordsWritten = 0L
- try {
+ Utils.tryWithSafeFinally {
while (iter.hasNext) {
val pair = iter.next()
writer.write(pair._1, pair._2)
@@ -1004,7 +1004,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
- } finally {
+ } {
writer.close(hadoopContext)
}
committer.commitTask(hadoopContext)
@@ -1068,7 +1068,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
writer.setup(context.stageId, context.partitionId, taskAttemptId)
writer.open()
var recordsWritten = 0L
- try {
+
+ Utils.tryWithSafeFinally {
while (iter.hasNext) {
val record = iter.next()
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
@@ -1077,7 +1078,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
recordsWritten += 1
}
- } finally {
+ } {
writer.close()
}
writer.commit()
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index 50edb5a34e..a1741e2875 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
+import org.apache.spark.util.Utils
import IndexShuffleBlockManager.NOOP_REDUCE_ID
@@ -78,16 +79,15 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
- try {
+ Utils.tryWithSafeFinally {
// We take in lengths of each block, need to convert it to offsets.
var offset = 0L
out.writeLong(offset)
-
for (length <- lengths) {
offset += length
out.writeLong(offset)
}
- } finally {
+ } {
out.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index f703e50b6b..0dfc91dfaf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -23,6 +23,7 @@ import java.nio.channels.FileChannel
import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.util.Utils
/**
* An interface for writing JVM objects to some underlying storage. This interface allows
@@ -140,14 +141,17 @@ private[spark] class DiskBlockObjectWriter(
override def close() {
if (initialized) {
- if (syncWrites) {
- // Force outstanding writes to disk and track how long it takes
- objOut.flush()
- callWithTiming {
- fos.getFD.sync()
+ Utils.tryWithSafeFinally {
+ if (syncWrites) {
+ // Force outstanding writes to disk and track how long it takes
+ objOut.flush()
+ callWithTiming {
+ fos.getFD.sync()
+ }
}
+ } {
+ objOut.close()
}
- objOut.close()
channel = null
bs = null
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 61ef5ff168..4b232ae7d3 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -46,10 +46,13 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val channel = new FileOutputStream(file).getChannel
- while (bytes.remaining > 0) {
- channel.write(bytes)
+ Utils.tryWithSafeFinally {
+ while (bytes.remaining > 0) {
+ channel.write(bytes)
+ }
+ } {
+ channel.close()
}
- channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
@@ -75,9 +78,9 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
try {
- try {
+ Utils.tryWithSafeFinally {
blockManager.dataSerializeStream(blockId, outputStream, values)
- } finally {
+ } {
// Close outputStream here because it should be closed before file is deleted.
outputStream.close()
}
@@ -106,8 +109,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {
val channel = new RandomAccessFile(file, "r").getChannel
-
- try {
+ Utils.tryWithSafeFinally {
// For small files, directly read rather than memory map
if (length < minMemoryMapBytes) {
val buf = ByteBuffer.allocate(length.toInt)
@@ -123,7 +125,7 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
} else {
Some(channel.map(MapMode.READ_ONLY, offset, length))
}
- } finally {
+ } {
channel.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index bb8bd10156..7c85e28679 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -313,7 +313,7 @@ private[spark] object Utils extends Logging {
transferToEnabled: Boolean = false): Long =
{
var count = 0L
- try {
+ tryWithSafeFinally {
if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
&& transferToEnabled) {
// When both streams are File stream, use transferTo to improve copy performance.
@@ -353,7 +353,7 @@ private[spark] object Utils extends Logging {
}
}
count
- } finally {
+ } {
if (closeStreams) {
try {
in.close()
@@ -1214,6 +1214,44 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Execute a block of code, then a finally block, but if exceptions happen in
+ * the finally block, do not suppress the original exception.
+ *
+ * This is primarily an issue with `finally { out.close() }` blocks, where
+ * close needs to be called to clean up `out`, but if an exception happened
+ * in `out.write`, it's likely `out` may be corrupted and `out.close` will
+ * fail as well. This would then suppress the original/likely more meaningful
+ * exception from the original `out.write` call.
+ */
+ def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = {
+ // It would be nice to find a method on Try that did this
+ var originalThrowable: Throwable = null
+ try {
+ block
+ } catch {
+ case t: Throwable =>
+ // Purposefully not using NonFatal, because even fatal exceptions
+ // we don't want to have our finallyBlock suppress
+ originalThrowable = t
+ throw originalThrowable
+ } finally {
+ try {
+ finallyBlock
+ } catch {
+ case t: Throwable =>
+ if (originalThrowable != null) {
+ // We could do originalThrowable.addSuppressed(t), but it's
+ // not available in JDK 1.6.
+ logWarning(s"Suppressing exception in finally: " + t.getMessage, t)
+ throw originalThrowable
+ } else {
+ throw t
+ }
+ }
+ }
+ }
+
/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
@@ -2074,7 +2112,7 @@ private[spark] class RedirectThread(
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- try {
+ Utils.tryWithSafeFinally {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
@@ -2082,7 +2120,7 @@ private[spark] class RedirectThread(
out.flush()
len = in.read(buf)
}
- } finally {
+ } {
if (propagateEof) {
out.close()
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 7bd3c7852a..035f3767ff 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -728,25 +728,19 @@ private[spark] class ExternalSorter[K, V, C](
// this simple we spill out the current in-memory collection so that everything is in files.
spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
partitionWriters.foreach(_.commitAndClose())
- var out: FileOutputStream = null
- var in: FileInputStream = null
+ val out = new FileOutputStream(outputFile, true)
val writeStartTime = System.nanoTime
- try {
- out = new FileOutputStream(outputFile, true)
+ util.Utils.tryWithSafeFinally {
for (i <- 0 until numPartitions) {
- in = new FileInputStream(partitionWriters(i).fileSegment().file)
- val size = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
- in.close()
- in = null
- lengths(i) = size
- }
- } finally {
- if (out != null) {
- out.close()
- }
- if (in != null) {
- in.close()
+ val in = new FileInputStream(partitionWriters(i).fileSegment().file)
+ util.Utils.tryWithSafeFinally {
+ lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled)
+ } {
+ in.close()
+ }
}
+ } {
+ out.close()
context.taskMetrics.shuffleWriteMetrics.foreach(
_.incShuffleWriteTime(System.nanoTime - writeStartTime))
}