aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-29 02:06:33 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-29 02:06:33 -0800
commit6fcd09f499dca66d255aa7196839156433aae442 (patch)
tree2aa4175794c21a4446347785b3fb50f9d39e3794 /core
parentc9789751bfc496d24e8369a0035d57f0ed8dcb58 (diff)
downloadspark-6fcd09f499dca66d255aa7196839156433aae442.tar.gz
spark-6fcd09f499dca66d255aa7196839156433aae442.tar.bz2
spark-6fcd09f499dca66d255aa7196839156433aae442.zip
Added TimeStampedHashSet and used that to cleanup the list of registered RDD IDs in CacheTracker.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala10
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala14
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashSet.scala66
3 files changed, 81 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 9888f061d9..cb54e12257 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -14,7 +14,7 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManager
import spark.storage.StorageLevel
-import util.{MetadataCleaner, TimeStampedHashMap}
+import util.{TimeStampedHashSet, MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait CacheTrackerMessage
@@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private val slaveCapacity = new HashMap[String, Long]
private val slaveUsage = new HashMap[String, Long]
- private val metadataCleaner = new MetadataCleaner("CacheTracker", locs.cleanup)
+ private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup)
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
@@ -113,11 +113,15 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
actorSystem.actorFor(url)
}
- val registeredRddIds = new HashSet[Int]
+ // TODO: Consider removing this HashSet completely as locs CacheTrackerActor already
+ // keeps track of registered RDDs
+ val registeredRddIds = new TimeStampedHashSet[Int]
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[String]
+ val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup)
+
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
def askTracker(message: Any): Any = {
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index 7a22b80a20..9bcc9245c0 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -1,7 +1,7 @@
package spark.util
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{HashMap, Map}
+import scala.collection.JavaConversions
+import scala.collection.mutable.Map
import java.util.concurrent.ConcurrentHashMap
/**
@@ -20,7 +20,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() {
def iterator: Iterator[(A, B)] = {
val jIterator = internalMap.entrySet().iterator()
- jIterator.map(kv => (kv.getKey, kv.getValue._1))
+ JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1))
}
override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = {
@@ -31,8 +31,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() {
}
override def - (key: A): Map[A, B] = {
- internalMap.remove(key)
- this
+ val newMap = new TimeStampedHashMap[A, B]
+ newMap.internalMap.putAll(this.internalMap)
+ newMap.internalMap.remove(key)
+ newMap
}
override def += (kv: (A, B)): this.type = {
@@ -56,7 +58,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() {
}
override def filter(p: ((A, B)) => Boolean): Map[A, B] = {
- internalMap.map(kv => (kv._1, kv._2._1)).filter(p)
+ JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p)
}
override def empty: Map[A, B] = new TimeStampedHashMap[A, B]()
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
new file mode 100644
index 0000000000..539dd75844
--- /dev/null
+++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
@@ -0,0 +1,66 @@
+package spark.util
+
+import scala.collection.mutable.Set
+import scala.collection.JavaConversions
+import java.util.concurrent.ConcurrentHashMap
+
+
+class TimeStampedHashSet[A] extends Set[A] {
+ val internalMap = new ConcurrentHashMap[A, Long]()
+
+ def contains(key: A): Boolean = {
+ internalMap.contains(key)
+ }
+
+ def iterator: Iterator[A] = {
+ val jIterator = internalMap.entrySet().iterator()
+ JavaConversions.asScalaIterator(jIterator).map(_.getKey)
+ }
+
+ override def + (elem: A): Set[A] = {
+ val newSet = new TimeStampedHashSet[A]
+ newSet ++= this
+ newSet += elem
+ newSet
+ }
+
+ override def - (elem: A): Set[A] = {
+ val newSet = new TimeStampedHashSet[A]
+ newSet ++= this
+ newSet -= elem
+ newSet
+ }
+
+ override def += (key: A): this.type = {
+ internalMap.put(key, currentTime)
+ this
+ }
+
+ override def -= (key: A): this.type = {
+ internalMap.remove(key)
+ this
+ }
+
+ override def empty: Set[A] = new TimeStampedHashSet[A]()
+
+ override def size(): Int = internalMap.size()
+
+ override def foreach[U](f: (A) => U): Unit = {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ f(iterator.next.getKey)
+ }
+ }
+
+ def cleanup(threshTime: Long) {
+ val iterator = internalMap.entrySet().iterator()
+ while(iterator.hasNext) {
+ val entry = iterator.next()
+ if (entry.getValue < threshTime) {
+ iterator.remove()
+ }
+ }
+ }
+
+ private def currentTime: Long = System.currentTimeMillis()
+}