aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-11-28 18:48:14 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-11-28 18:48:14 -0800
commit9e9e9e1d898387a1996e4c57128bafadb5938a9b (patch)
tree314ba5b91a9241853d329debcf7caa3e49b009d1 /core
parente463ae492068d2922e1d50c051a87f8010953dff (diff)
downloadspark-9e9e9e1d898387a1996e4c57128bafadb5938a9b.tar.gz
spark-9e9e9e1d898387a1996e4c57128bafadb5938a9b.tar.bz2
spark-9e9e9e1d898387a1996e4c57128bafadb5938a9b.zip
Renamed CleanupTask to MetadataCleaner.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala5
-rw-r--r--core/src/main/scala/spark/util/MetadataCleaner.scala (renamed from core/src/main/scala/spark/util/CleanupTask.scala)6
5 files changed, 15 insertions, 14 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 0ee59bee0f..9888f061d9 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -14,7 +14,7 @@ import scala.collection.mutable.HashSet
import spark.storage.BlockManager
import spark.storage.StorageLevel
-import util.{CleanupTask, TimeStampedHashMap}
+import util.{MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait CacheTrackerMessage
@@ -39,7 +39,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
private val slaveCapacity = new HashMap[String, Long]
private val slaveUsage = new HashMap[String, Long]
- private val cleanupTask = new CleanupTask("CacheTracker", locs.cleanup)
+ private val metadataCleaner = new MetadataCleaner("CacheTracker", locs.cleanup)
private def getCacheUsage(host: String): Long = slaveUsage.getOrElse(host, 0L)
private def getCacheCapacity(host: String): Long = slaveCapacity.getOrElse(host, 0L)
@@ -89,7 +89,7 @@ private[spark] class CacheTrackerActor extends Actor with Logging {
case StopCacheTracker =>
logInfo("Stopping CacheTrackerActor")
sender ! true
- cleanupTask.cancel()
+ metadataCleaner.cancel()
context.stop(self)
}
}
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index d0be1bb913..20ff5431af 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -17,7 +17,7 @@ import scala.collection.mutable.HashSet
import scheduler.MapStatus
import spark.storage.BlockManagerId
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import util.{CleanupTask, TimeStampedHashMap}
+import util.{MetadataCleaner, TimeStampedHashMap}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
@@ -64,7 +64,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
actorSystem.actorFor(url)
}
- val cleanupTask = new CleanupTask("MapOutputTracker", this.cleanup)
+ val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
// Send a message to the trackerActor and get its result within a default timeout, or
// throw a SparkException if this fails.
@@ -175,7 +175,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
def stop() {
communicate(StopMapOutputTracker)
mapStatuses.clear()
- cleanupTask.cancel()
+ metadataCleaner.cancel()
trackerActor = null
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index affacb43ca..4b2570fa2b 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -14,7 +14,7 @@ import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
import spark.storage.BlockManagerMaster
import spark.storage.BlockManagerId
-import util.{CleanupTask, TimeStampedHashMap}
+import util.{MetadataCleaner, TimeStampedHashMap}
/**
* A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
@@ -84,7 +84,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob]
- val cleanupTask = new CleanupTask("DAGScheduler", this.cleanup)
+ val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
// Start a thread to run the DAGScheduler event loop
new Thread("DAGScheduler") {
@@ -610,7 +610,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
def stop() {
eventQueue.put(StopDAGScheduler)
- cleanupTask.cancel()
+ metadataCleaner.cancel()
taskSched.stop()
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index fbf618c906..683f5ebec3 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -14,7 +14,7 @@ import com.ning.compress.lzf.LZFOutputStream
import spark._
import spark.storage._
-import util.{TimeStampedHashMap, CleanupTask}
+import util.{TimeStampedHashMap, MetadataCleaner}
private[spark] object ShuffleMapTask {
@@ -22,7 +22,8 @@ private[spark] object ShuffleMapTask {
// Served as a cache for task serialization because serialization can be
// expensive on the master node if it needs to launch thousands of tasks.
val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
- val cleanupTask = new CleanupTask("ShuffleMapTask", serializedInfoCache.cleanup)
+
+ val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.cleanup)
def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
synchronized {
diff --git a/core/src/main/scala/spark/util/CleanupTask.scala b/core/src/main/scala/spark/util/MetadataCleaner.scala
index a4357c62c6..71ac39864e 100644
--- a/core/src/main/scala/spark/util/CleanupTask.scala
+++ b/core/src/main/scala/spark/util/MetadataCleaner.scala
@@ -4,7 +4,7 @@ import java.util.concurrent.{TimeUnit, ScheduledFuture, Executors}
import java.util.{TimerTask, Timer}
import spark.Logging
-class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging {
+class MetadataCleaner(name: String, cleanupFunc: (Long) => Unit) extends Logging {
val delaySeconds = (System.getProperty("spark.cleanup.delay", "-100").toDouble * 60).toInt
val periodSeconds = math.max(10, delaySeconds / 10)
val timer = new Timer(name + " cleanup timer", true)
@@ -13,7 +13,7 @@ class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging {
try {
if (delaySeconds > 0) {
cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000))
- logInfo("Ran cleanup task for " + name)
+ logInfo("Ran metadata cleaner for " + name)
}
} catch {
case e: Exception => logError("Error running cleanup task for " + name, e)
@@ -21,7 +21,7 @@ class CleanupTask(name: String, cleanupFunc: (Long) => Unit) extends Logging {
}
}
if (periodSeconds > 0) {
- logInfo("Starting cleanup task for " + name + " with delay of " + delaySeconds + " seconds and "
+ logInfo("Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds and "
+ "period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}