aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-03 22:37:31 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-03 22:37:31 -0800
commita69a82be2682148f5d1ebbdede15a47c90eea73d (patch)
tree79d26d309b3d333478de396dd0f4c1f96fea583c /core
parent609e00d599d3f429a838f598b3f32c5fdbd7ec5e (diff)
downloadspark-a69a82be2682148f5d1ebbdede15a47c90eea73d.tar.gz
spark-a69a82be2682148f5d1ebbdede15a47c90eea73d.tar.bz2
spark-a69a82be2682148f5d1ebbdede15a47c90eea73d.zip
Added metadata cleaner to HttpBroadcast to clean up old broacast files.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala24
1 files changed, 24 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index 7eb4ddb74f..fef264aab1 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -11,6 +11,7 @@ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import spark._
import spark.storage.StorageLevel
+import util.{MetadataCleaner, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
@@ -64,6 +65,10 @@ private object HttpBroadcast extends Logging {
private var serverUri: String = null
private var server: HttpServer = null
+ private val files = new TimeStampedHashSet[String]
+ private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+
+
def initialize(isMaster: Boolean) {
synchronized {
if (!initialized) {
@@ -85,6 +90,7 @@ private object HttpBroadcast extends Logging {
server = null
}
initialized = false
+ cleaner.cancel()
}
}
@@ -108,6 +114,7 @@ private object HttpBroadcast extends Logging {
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
+ files += file.getAbsolutePath
}
def read[T](id: Long): T = {
@@ -123,4 +130,21 @@ private object HttpBroadcast extends Logging {
serIn.close()
obj
}
+
+ def cleanup(cleanupTime: Long) {
+ val iterator = files.internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ val (file, time) = (entry.getKey, entry.getValue)
+ if (time < cleanupTime) {
+ try {
+ iterator.remove()
+ new File(file.toString).delete()
+ logInfo("Deleted broadcast file '" + file + "'")
+ } catch {
+ case e: Exception => logWarning("Could not delete broadcast file '" + file + "'", e)
+ }
+ }
+ }
+ }
}