diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-29 02:06:33 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-29 02:06:33 -0800 |
commit | 6fcd09f499dca66d255aa7196839156433aae442 (patch) | |
tree | 2aa4175794c21a4446347785b3fb50f9d39e3794 /core | |
parent | c9789751bfc496d24e8369a0035d57f0ed8dcb58 (diff) | |
download | spark-6fcd09f499dca66d255aa7196839156433aae442.tar.gz spark-6fcd09f499dca66d255aa7196839156433aae442.tar.bz2 spark-6fcd09f499dca66d255aa7196839156433aae442.zip |
Added TimeStampedHashSet and used that to cleanup the list of registered RDD IDs in CacheTracker.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 10 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/TimeStampedHashMap.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/TimeStampedHashSet.scala | 66 |
3 files changed, 81 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 9888f061d9..cb54e12257 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -14,7 +14,7 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel -import util.{MetadataCleaner, TimeStampedHashMap} +import util.{TimeStampedHashSet, MetadataCleaner, TimeStampedHashMap} private[spark] sealed trait CacheTrackerMessage @@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging { private val slaveCapacity = new HashMap[String, Long] private val slaveUsage = new HashMap[String, Long] - private val metadataCleaner = new MetadataCleaner("CacheTracker", locs.cleanup) + private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup) private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L) private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L) @@ -113,11 +113,15 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b actorSystem.actorFor(url) } - val registeredRddIds = new HashSet[Int] + // TODO: Consider removing this HashSet completely as locs CacheTrackerActor already + // keeps track of registered RDDs + val registeredRddIds = new TimeStampedHashSet[Int] // Remembers which splits are currently being loaded (on worker nodes) val loading = new HashSet[String] + val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup) + // Send a message to the trackerActor and get its result within a default timeout, or // throw a SparkException if this fails. def askTracker(message: Any): Any = { diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala index 7a22b80a20..9bcc9245c0 100644 --- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala @@ -1,7 +1,7 @@ package spark.util -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, Map} +import scala.collection.JavaConversions +import scala.collection.mutable.Map import java.util.concurrent.ConcurrentHashMap /** @@ -20,7 +20,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { def iterator: Iterator[(A, B)] = { val jIterator = internalMap.entrySet().iterator() - jIterator.map(kv => (kv.getKey, kv.getValue._1)) + JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue._1)) } override def + [B1 >: B](kv: (A, B1)): Map[A, B1] = { @@ -31,8 +31,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { } override def - (key: A): Map[A, B] = { - internalMap.remove(key) - this + val newMap = new TimeStampedHashMap[A, B] + newMap.internalMap.putAll(this.internalMap) + newMap.internalMap.remove(key) + newMap } override def += (kv: (A, B)): this.type = { @@ -56,7 +58,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() { } override def filter(p: ((A, B)) => Boolean): Map[A, B] = { - internalMap.map(kv => (kv._1, kv._2._1)).filter(p) + JavaConversions.asScalaConcurrentMap(internalMap).map(kv => (kv._1, kv._2._1)).filter(p) } override def empty: Map[A, B] = new TimeStampedHashMap[A, B]() diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala new file mode 100644 index 0000000000..539dd75844 --- /dev/null +++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala @@ -0,0 +1,66 @@ +package spark.util + +import scala.collection.mutable.Set +import scala.collection.JavaConversions +import java.util.concurrent.ConcurrentHashMap + + +class TimeStampedHashSet[A] extends Set[A] { + val internalMap = new ConcurrentHashMap[A, Long]() + + def contains(key: A): Boolean = { + internalMap.contains(key) + } + + def iterator: Iterator[A] = { + val jIterator = internalMap.entrySet().iterator() + JavaConversions.asScalaIterator(jIterator).map(_.getKey) + } + + override def + (elem: A): Set[A] = { + val newSet = new TimeStampedHashSet[A] + newSet ++= this + newSet += elem + newSet + } + + override def - (elem: A): Set[A] = { + val newSet = new TimeStampedHashSet[A] + newSet ++= this + newSet -= elem + newSet + } + + override def += (key: A): this.type = { + internalMap.put(key, currentTime) + this + } + + override def -= (key: A): this.type = { + internalMap.remove(key) + this + } + + override def empty: Set[A] = new TimeStampedHashSet[A]() + + override def size(): Int = internalMap.size() + + override def foreach[U](f: (A) => U): Unit = { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + f(iterator.next.getKey) + } + } + + def cleanup(threshTime: Long) { + val iterator = internalMap.entrySet().iterator() + while(iterator.hasNext) { + val entry = iterator.next() + if (entry.getValue < threshTime) { + iterator.remove() + } + } + } + + private def currentTime: Long = System.currentTimeMillis() +} |