aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvan Jones <ejones@twitter.com>2015-05-08 22:00:39 +0100
committerSean Owen <sowen@cloudera.com>2015-05-08 22:01:01 +0100
commit62308097b2a7376f7d2e3c3c28fa694125c9bddf (patch)
tree27e9ce39c95c408a8917f0a7e8b237245fe85142
parent82be68f105b58439e0fb2249b45d6447f5dd753a (diff)
downloadspark-62308097b2a7376f7d2e3c3c28fa694125c9bddf.tar.gz
spark-62308097b2a7376f7d2e3c3c28fa694125c9bddf.tar.bz2
spark-62308097b2a7376f7d2e3c3c28fa694125c9bddf.zip
[SPARK-7490] [CORE] [Minor] MapOutputTracker.deserializeMapStatuses: close input streams
GZIPInputStream allocates native memory that is not freed until close() or when the finalizer runs. It is best to close() these streams explicitly. stephenh made the same change for serializeMapStatuses in commit b0d884f0. This is the same change for deserialize. (I ran the unit test suite! it seems to have passed. I did not make a JIRA since this seems "trivial", and the guidelines suggest it is not required for trivial changes) Author: Evan Jones <ejones@twitter.com> Closes #5982 from evanj/master and squashes the following commits: 0d76e85 [Evan Jones] [CORE] MapOutputTracker.deserializeMapStatuses: close input streams (cherry picked from commit 25889d8d97094325f10fbf52f3b36412f212eeb2) Signed-off-by: Sean Owen <sowen@cloudera.com>
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 16072283ed..018422827e 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -367,7 +367,11 @@ private[spark] object MapOutputTracker extends Logging {
// Opposite of serializeMapStatuses.
def deserializeMapStatuses(bytes: Array[Byte]): Array[MapStatus] = {
val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
- objIn.readObject().asInstanceOf[Array[MapStatus]]
+ Utils.tryWithSafeFinally {
+ objIn.readObject().asInstanceOf[Array[MapStatus]]
+ } {
+ objIn.close()
+ }
}
// Convert an array of MapStatuses to locations and sizes for a given reduce ID. If