aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 12:11:27 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 12:11:27 -0800
commit1346126485444afc065bf4951c4bedebe5c95ce4 (patch)
tree5ff3f0cf3ebe3ba0a93f4c57b8cbacc166a985d6 /core
parentaf8738dfb592eb37d4d6c91e42624e844d4e493b (diff)
downloadspark-1346126485444afc065bf4951c4bedebe5c95ce4.tar.gz
spark-1346126485444afc065bf4951c4bedebe5c95ce4.tar.bz2
spark-1346126485444afc065bf4951c4bedebe5c95ce4.zip
Changed cleanup to clearOldValues for TimeStampedHashMap and TimeStampedHashSet.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala4
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashMap.scala7
-rw-r--r--core/src/main/scala/spark/util/TimeStampedHashSet.scala5
7 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 7d320c4fe5..86ad737583 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private val slaveCapacity = new HashMap[String, Long]
private val slaveUsage = new HashMap[String, Long]
- private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.cleanup)
+ private val metadataCleaner = new MetadataCleaner("CacheTrackerActor", locs.clearOldValues)
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
@@ -120,7 +120,7 @@ private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, b
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[String]
- val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.cleanup)
+ val metadataCleaner = new MetadataCleaner("CacheTracker", registeredRddIds.clearOldValues)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 5ebdba0fc8..a2fa2d1ea7 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -178,8 +178,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
def cleanup(cleanupTime: Long) {
- mapStatuses.cleanup(cleanupTime)
- cachedSerializedStatuses.cleanup(cleanupTime)
+ mapStatuses.clearOldValues(cleanupTime)
+ cachedSerializedStatuses.clearOldValues(cleanupTime)
}
def stop() {
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9387ba19a3..59f2099e91 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -599,15 +599,15 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def cleanup(cleanupTime: Long) {
var sizeBefore = idToStage.size
- idToStage.cleanup(cleanupTime)
+ idToStage.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
sizeBefore = shuffleToMapStage.size
- shuffleToMapStage.cleanup(cleanupTime)
+ shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
sizeBefore = pendingTasks.size
- pendingTasks.cleanup(cleanupTime)
+ pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 7ec6564105..74a63c1af1 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -12,7 +12,7 @@ private[spark] object ResultTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.cleanup)
+ val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index feb63abb61..19f5328eee 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -23,7 +23,7 @@ private[spark] object ShuffleMapTask {
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup)
+ val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
index 7e785182ea..bb7c5c01c8 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashMap.scala
@@ -7,7 +7,7 @@ import scala.collection.mutable.Map
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
* time stamp along with each key-value pair. Key-value pairs that are older than a particular
- * threshold time can them be removed using the cleanup method. This is intended to be a drop-in
+ * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap.
*/
class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
@@ -74,7 +74,10 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
}
}
- def cleanup(threshTime: Long) {
+ /**
+ * Removes old key-value pairs that have timestamp earlier than `threshTime`
+ */
+ def clearOldValues(threshTime: Long) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
diff --git a/core/src/main/scala/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
index 539dd75844..5f1cc93752 100644
--- a/core/src/main/scala/spark/util/TimeStampedHashSet.scala
+++ b/core/src/main/scala/spark/util/TimeStampedHashSet.scala
@@ -52,7 +52,10 @@ class TimeStampedHashSet[A] extends Set[A] {
}
}
- def cleanup(threshTime: Long) {
+ /**
+ * Removes old values that have timestamp earlier than `threshTime`
+ */
+ def clearOldValues(threshTime: Long) {
val iterator = internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()