aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2015-07-26 13:35:16 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-07-26 13:35:16 -0700
commit6b2baec04fa3d928f0ee84af8c2723ac03a4648c (patch)
tree88c74fff12e6ebc7579e9b5b5e9c84a0be70fb1b /core
parent1cf19760d61a5a17bd175a906d34a2940141b76d (diff)
downloadspark-6b2baec04fa3d928f0ee84af8c2723ac03a4648c.tar.gz
spark-6b2baec04fa3d928f0ee84af8c2723ac03a4648c.tar.bz2
spark-6b2baec04fa3d928f0ee84af8c2723ac03a4648c.zip
[SPARK-9326] Close lock file used for file downloads.
A lock file is used to ensure multiple executors running on the same machine don't download the same file concurrently. Spark never closes these lock files (releasing the lock does not close the underlying file); this commit fixes that. cc vanzin (looks like you've been involved in various other fixes surrounding these lock files) Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #7650 from kayousterhout/SPARK-9326 and squashes the following commits: 0401bd1 [Kay Ousterhout] Close lock file used for file downloads.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index c5816949cd..c4012d0e83 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -443,11 +443,11 @@ private[spark] object Utils extends Logging {
val lockFileName = s"${url.hashCode}${timestamp}_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
- val raf = new RandomAccessFile(lockFile, "rw")
+ val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel()
// Only one executor entry.
// The FileLock is only used to control synchronization for executors download file,
// it's always safe regardless of lock type (mandatory or advisory).
- val lock = raf.getChannel().lock()
+ val lock = lockFileChannel.lock()
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
@@ -455,6 +455,7 @@ private[spark] object Utils extends Logging {
}
} finally {
lock.release()
+ lockFileChannel.close()
}
copyFile(
url,