aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorlisurprise <zhichao.li@intel.com>2015-04-13 12:18:05 +0100
committerSean Owen <sowen@cloudera.com>2015-04-13 12:18:05 +0100
commitcadd7d72c52ccc8d2def405a77dcf807fb5c17c2 (patch)
treebba6add9a480636dea61ae760e9e7b58fd70e0c7 /streaming
parent950645d597dbc5a8c5010bcb1a9b51c6abad86ea (diff)
downloadspark-cadd7d72c52ccc8d2def405a77dcf807fb5c17c2.tar.gz
spark-cadd7d72c52ccc8d2def405a77dcf807fb5c17c2.tar.bz2
spark-cadd7d72c52ccc8d2def405a77dcf807fb5c17c2.zip
[SPARK-6762]Fix potential resource leaks in CheckPoint CheckpointWriter and CheckpointReader
The close action should be placed within finally block to avoid the potential resource leaks Author: lisurprise <zhichao.li@intel.com> Closes #5407 from zhichao-li/master and squashes the following commits: 065999f [lisurprise] add guard for null ef862d6 [lisurprise] remove fs.close a754adc [lisurprise] refactor with tryWithSafeFinally 824adb3 [lisurprise] close before validation c877da7 [lisurprise] Fix potential resource leaks
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala47
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala3
2 files changed, 31 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 28703ef812..0a50485118 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkException, SparkConf, Logging}
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.MetadataCleaner
+import org.apache.spark.util.{MetadataCleaner, Utils}
import org.apache.spark.streaming.scheduler.JobGenerator
@@ -139,8 +139,11 @@ class CheckpointWriter(
// Write checkpoint to temp file
fs.delete(tempFile, true) // just in case it exists
val fos = fs.create(tempFile)
- fos.write(bytes)
- fos.close()
+ Utils.tryWithSafeFinally {
+ fos.write(bytes)
+ } {
+ fos.close()
+ }
// If the checkpoint file exists, back it up
// If the backup exists as well, just delete it, otherwise rename will fail
@@ -187,9 +190,11 @@ class CheckpointWriter(
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
- oos.writeObject(checkpoint)
- oos.close()
- bos.close()
+ Utils.tryWithSafeFinally {
+ oos.writeObject(checkpoint)
+ } {
+ oos.close()
+ }
try {
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
@@ -248,18 +253,24 @@ object CheckpointReader extends Logging {
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
- val fis = fs.open(file)
- // ObjectInputStream uses the last defined user-defined class loader in the stack
- // to find classes, which maybe the wrong class loader. Hence, a inherited version
- // of ObjectInputStream is used to explicitly use the current thread's default class
- // loader to find and load classes. This is a well know Java issue and has popped up
- // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val zis = compressionCodec.compressedInputStream(fis)
- val ois = new ObjectInputStreamWithLoader(zis,
- Thread.currentThread().getContextClassLoader)
- val cp = ois.readObject.asInstanceOf[Checkpoint]
- ois.close()
- fs.close()
+ var ois: ObjectInputStreamWithLoader = null
+ var cp: Checkpoint = null
+ Utils.tryWithSafeFinally {
+ val fis = fs.open(file)
+ // ObjectInputStream uses the last defined user-defined class loader in the stack
+ // to find classes, which maybe the wrong class loader. Hence, a inherited version
+ // of ObjectInputStream is used to explicitly use the current thread's default class
+ // loader to find and load classes. This is a well know Java issue and has popped up
+ // in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
+ val zis = compressionCodec.compressedInputStream(fis)
+ ois = new ObjectInputStreamWithLoader(zis,
+ Thread.currentThread().getContextClassLoader)
+ cp = ois.readObject.asInstanceOf[Checkpoint]
+ } {
+ if (ois != null) {
+ ois.close()
+ }
+ }
cp.validate()
logInfo("Checkpoint successfully loaded from file " + file)
logInfo("Checkpoint was generated at time " + cp.checkpointTime)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index a7850812bd..ca2f319f17 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -72,7 +72,8 @@ object RawTextSender extends Logging {
} catch {
case e: IOException =>
logError("Client disconnected")
- socket.close()
+ } finally {
+ socket.close()
}
}
}