diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-03 22:37:31 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-12-03 22:37:31 -0800 |
commit | a69a82be2682148f5d1ebbdede15a47c90eea73d (patch) | |
tree | 79d26d309b3d333478de396dd0f4c1f96fea583c | |
parent | 609e00d599d3f429a838f598b3f32c5fdbd7ec5e (diff) | |
download | spark-a69a82be2682148f5d1ebbdede15a47c90eea73d.tar.gz spark-a69a82be2682148f5d1ebbdede15a47c90eea73d.tar.bz2 spark-a69a82be2682148f5d1ebbdede15a47c90eea73d.zip |
Added metadata cleaner to HttpBroadcast to clean up old broacast files.
-rw-r--r-- | core/src/main/scala/spark/broadcast/HttpBroadcast.scala | 24 |
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala index 7eb4ddb74f..fef264aab1 100644 --- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala @@ -11,6 +11,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import spark._ import spark.storage.StorageLevel +import util.{MetadataCleaner, TimeStampedHashSet} private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long) extends Broadcast[T](id) with Logging with Serializable { @@ -64,6 +65,10 @@ private object HttpBroadcast extends Logging { private var serverUri: String = null private var server: HttpServer = null + private val files = new TimeStampedHashSet[String] + private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup) + + def initialize(isMaster: Boolean) { synchronized { if (!initialized) { @@ -85,6 +90,7 @@ private object HttpBroadcast extends Logging { server = null } initialized = false + cleaner.cancel() } } @@ -108,6 +114,7 @@ private object HttpBroadcast extends Logging { val serOut = ser.serializeStream(out) serOut.writeObject(value) serOut.close() + files += file.getAbsolutePath } def read[T](id: Long): T = { @@ -123,4 +130,21 @@ private object HttpBroadcast extends Logging { serIn.close() obj } + + def cleanup(cleanupTime: Long) { + val iterator = files.internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + val (file, time) = (entry.getKey, entry.getValue) + if (time < cleanupTime) { + try { + iterator.remove() + new File(file.toString).delete() + logInfo("Deleted broadcast file '" + file + "'") + } catch { + case e: Exception => logWarning("Could not delete broadcast file '" + file + "'", e) + } + } + } + } } |