diff options
Diffstat (limited to 'core/src/main/scala/spark/CacheTracker.scala')
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 57 |
1 files changed, 20 insertions, 37 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 22110832f8..c5db6ce63a 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -15,19 +15,20 @@ import scala.collection.mutable.HashSet import spark.storage.BlockManager import spark.storage.StorageLevel -sealed trait CacheTrackerMessage -case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L) +private[spark] sealed trait CacheTrackerMessage + +private[spark] case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L) extends CacheTrackerMessage -case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L) +private[spark] case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L) extends CacheTrackerMessage -case class MemoryCacheLost(host: String) extends CacheTrackerMessage -case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage -case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage -case object GetCacheStatus extends CacheTrackerMessage -case object GetCacheLocations extends CacheTrackerMessage -case object StopCacheTracker extends CacheTrackerMessage - -class CacheTrackerActor extends Actor with Logging { +private[spark] case class MemoryCacheLost(host: String) extends CacheTrackerMessage +private[spark] case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage +private[spark] case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage +private[spark] case object GetCacheStatus extends CacheTrackerMessage +private[spark] case object GetCacheLocations extends CacheTrackerMessage +private[spark] case object StopCacheTracker extends CacheTrackerMessage + +private[spark] class CacheTrackerActor extends Actor with Logging { // TODO: Should probably store (String, CacheType) tuples private val locs = new HashMap[Int, Array[List[String]]] @@ -43,8 +44,6 @@ class CacheTrackerActor extends Actor with Logging { def receive = { case SlaveCacheStarted(host: String, size: Long) => - logInfo("Started slave cache (size %s) on %s".format( - Utils.memoryBytesToString(size), host)) slaveCapacity.put(host, size) slaveUsage.put(host, 0) sender ! true @@ -56,22 +55,12 @@ class CacheTrackerActor extends Actor with Logging { case AddedToCache(rddId, partition, host, size) => slaveUsage.put(host, getCacheUsage(host) + size) - logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format( - rddId, partition, host, Utils.memoryBytesToString(size), - Utils.memoryBytesToString(getCacheAvailable(host)))) locs(rddId)(partition) = host :: locs(rddId)(partition) sender ! true case DroppedFromCache(rddId, partition, host, size) => - logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format( - rddId, partition, host, Utils.memoryBytesToString(size), - Utils.memoryBytesToString(getCacheAvailable(host)))) slaveUsage.put(host, getCacheUsage(host) - size) // Do a sanity check to make sure usage is greater than 0. - val usage = getCacheUsage(host) - if (usage < 0) { - logError("Cache usage on %s is negative (%d)".format(host, usage)) - } locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host) sender ! true @@ -101,7 +90,7 @@ class CacheTrackerActor extends Actor with Logging { } } -class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager) +private[spark] class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: BlockManager) extends Logging { // Tracker actor on the master, or remote reference to it on workers @@ -151,7 +140,6 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl logInfo("Registering RDD ID " + rddId + " with cache") registeredRddIds += rddId communicate(RegisterRDD(rddId, numPartitions)) - logInfo(RegisterRDD(rddId, numPartitions) + " successful") } } } @@ -169,9 +157,8 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl } // For BlockManager.scala only - def notifyTheCacheTrackerFromBlockManager(t: AddedToCache) { + def notifyFromBlockManager(t: AddedToCache) { communicate(t) - logInfo("notifyTheCacheTrackerFromBlockManager successful") } // Get a snapshot of the currently known locations @@ -181,7 +168,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // Gets or computes an RDD split def getOrCompute[T](rdd: RDD[T], split: Split, storageLevel: StorageLevel): Iterator[T] = { - val key = "rdd:%d:%d".format(rdd.id, split.index) + val key = "rdd_%d_%d".format(rdd.id, split.index) logInfo("Cache key is " + key) blockManager.get(key) match { case Some(cachedValues) => @@ -221,23 +208,19 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl // TODO: fetch any remote copy of the split that may be available // TODO: also register a listener for when it unloads logInfo("Computing partition " + split) + val elements = new ArrayBuffer[Any] + elements ++= rdd.compute(split) try { - // BlockManager will iterate over results from compute to create RDD - blockManager.put(key, rdd.compute(split), storageLevel, false) + // Try to put this block in the blockManager + blockManager.put(key, elements, storageLevel, true) //future.apply() // Wait for the reply from the cache tracker - blockManager.get(key) match { - case Some(values) => - return values.asInstanceOf[Iterator[T]] - case None => - logWarning("loading partition failed after computing it " + key) - return null - } } finally { loading.synchronized { loading.remove(key) loading.notifyAll() } } + return elements.iterator.asInstanceOf[Iterator[T]] } } |