aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@cs.berkeley.edu>2012-05-14 18:39:04 -0700
committerReynold Xin <rxin@cs.berkeley.edu>2012-05-14 18:39:04 -0700
commit019e48833fbcad66607f3f9bcf4053adbe8697b4 (patch)
treecf93b560501b9e950566fd0612df9ab63ba79954 /core
parentf48742683adf8ed18b0d25a724a13c66b3fc12e9 (diff)
downloadspark-019e48833fbcad66607f3f9bcf4053adbe8697b4.tar.gz
spark-019e48833fbcad66607f3f9bcf4053adbe8697b4.tar.bz2
spark-019e48833fbcad66607f3f9bcf4053adbe8697b4.zip
Added the capacity to report cache usage status back to the cache
trackor. This is essential for building a dashboard to see the status of caches on all slaves.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/BoundedMemoryCache.scala8
-rw-r--r--core/src/main/scala/spark/Cache.scala25
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala97
-rw-r--r--core/src/main/scala/spark/DiskSpillingCache.scala2
-rw-r--r--core/src/main/scala/spark/SerializingCache.scala2
-rw-r--r--core/src/main/scala/spark/SoftReferenceCache.scala4
-rw-r--r--core/src/main/scala/spark/Utils.scala24
-rw-r--r--core/src/test/scala/spark/Utils.scala18
8 files changed, 150 insertions, 30 deletions
diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
index c49be803e4..c25d0a62df 100644
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ b/core/src/main/scala/spark/BoundedMemoryCache.scala
@@ -30,7 +30,7 @@ class BoundedMemoryCache extends Cache with Logging {
}
}
- override def put(datasetId: Any, partition: Int, value: Any): Boolean = {
+ override def put(datasetId: Any, partition: Int, value: Any): Long = {
val key = (datasetId, partition)
logInfo("Asked to add key " + key)
val startTime = System.currentTimeMillis
@@ -44,14 +44,16 @@ class BoundedMemoryCache extends Cache with Logging {
map.put(key, new Entry(value, size))
currentBytes += size
logInfo("Number of entries is now " + map.size)
- return true
+ return size
} else {
logInfo("Didn't add key " + key + " because we would have evicted part of same dataset")
- return false
+ return -1L
}
}
}
+ override def getCapacity: Long = maxBytes
+
private def getMaxBytes(): Long = {
val memoryFractionToUse = System.getProperty(
"spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
index 263761bb95..a65d3b478d 100644
--- a/core/src/main/scala/spark/Cache.scala
+++ b/core/src/main/scala/spark/Cache.scala
@@ -24,12 +24,25 @@ abstract class Cache {
def newKeySpace() = new KeySpace(this, newKeySpaceId())
- // Get the value for a given (datasetId, partition), or null if it is not found.
+ /**
+ * Get the value for a given (datasetId, partition), or null if it is not
+ * found.
+ */
def get(datasetId: Any, partition: Int): Any
- // Attempt to put a value in the cache; returns false if this was not successful (e.g. because
- // the cache replacement policy forbids it).
- def put(datasetId: Any, partition: Int, value: Any): Boolean
+ /**
+ * Attempt to put a value in the cache; returns a negative number if this was
+ * not successful (e.g. because the cache replacement policy forbids it). If
+ * size estimation is available, the cache implementation should return the
+ * estimated size of the partition if the partition is successfully cached.
+ */
+ def put(datasetId: Any, partition: Int, value: Any): Long
+
+ /**
+ * Report the capacity of the cache partition. By default this just reports
+ * zero. Specific implementations can choose to provide the capacity number.
+ */
+ def getCapacity: Long = 0L
}
/**
@@ -39,6 +52,8 @@ class KeySpace(cache: Cache, val keySpaceId: Int) {
def get(datasetId: Any, partition: Int): Any =
cache.get((keySpaceId, datasetId), partition)
- def put(datasetId: Any, partition: Int, value: Any): Boolean =
+ def put(datasetId: Any, partition: Int, value: Any): Long =
cache.put((keySpaceId, datasetId), partition, value)
+
+ def getCapacity: Long = cache.getCapacity
}
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index c399748af3..b472dc8070 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -7,15 +7,27 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
sealed trait CacheTrackerMessage
-case class AddedToCache(rddId: Int, partition: Int, host: String) extends CacheTrackerMessage
-case class DroppedFromCache(rddId: Int, partition: Int, host: String) extends CacheTrackerMessage
+case class AddedToCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
+ extends CacheTrackerMessage
+case class DroppedFromCache(rddId: Int, partition: Int, host: String, size: Long = 0L)
+ extends CacheTrackerMessage
case class MemoryCacheLost(host: String) extends CacheTrackerMessage
case class RegisterRDD(rddId: Int, numPartitions: Int) extends CacheTrackerMessage
+case class SlaveCacheStarted(host: String, size: Long) extends CacheTrackerMessage
+case object GetCacheStatus extends CacheTrackerMessage
case object GetCacheLocations extends CacheTrackerMessage
case object StopCacheTracker extends CacheTrackerMessage
+
class CacheTrackerActor extends DaemonActor with Logging {
val locs = new HashMap[Int, Array[List[String]]]
+
+ /**
+ * A map from the slave's host name to its cache size.
+ */
+ val slaveCapacity = new HashMap[String, Long]
+ val slaveUsage = new HashMap[String, Long]
+
// TODO: Should probably store (String, CacheType) tuples
def act() {
@@ -26,18 +38,42 @@ class CacheTrackerActor extends DaemonActor with Logging {
loop {
react {
+ case SlaveCacheStarted(host: String, size: Long) =>
+ logInfo("Started slave cache (size %s) on %s".format(Utils.sizeWithSuffix(size), host))
+ slaveCapacity.put(host, size)
+ slaveUsage.put(host, 0)
+ reply('OK)
+
case RegisterRDD(rddId: Int, numPartitions: Int) =>
logInfo("Registering RDD " + rddId + " with " + numPartitions + " partitions")
locs(rddId) = Array.fill[List[String]](numPartitions)(Nil)
reply('OK)
- case AddedToCache(rddId, partition, host) =>
- logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host))
+ case AddedToCache(rddId, partition, host, size) =>
+ if (size > 0) {
+ logInfo("Cache entry added: (%s, %s) on %s, size: %s".format(
+ rddId, partition, host, Utils.sizeWithSuffix(size)))
+ slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size)
+ } else {
+ logInfo("Cache entry added: (%s, %s) on %s".format(rddId, partition, host))
+ }
locs(rddId)(partition) = host :: locs(rddId)(partition)
reply('OK)
- case DroppedFromCache(rddId, partition, host) =>
- logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
+ case DroppedFromCache(rddId, partition, host, size) =>
+ if (size > 0) {
+ logInfo("Cache entry removed: (%s, %s) on %s, size: %s".format(
+ rddId, partition, host, Utils.sizeWithSuffix(size)))
+ slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size)
+
+ // Do a sanity check to make sure usage is greater than 0.
+ val usage = slaveUsage.getOrElse(host, 0L)
+ if (usage < 0) {
+ logError("Cache usage on %s is negative (%d)".format(host, usage))
+ }
+ } else {
+ logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
+ }
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
case MemoryCacheLost(host) =>
@@ -52,6 +88,12 @@ class CacheTrackerActor extends DaemonActor with Logging {
}
reply(locsCopy)
+ case GetCacheStatus =>
+ val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key =>
+ (key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L))
+ }.toSeq
+ reply(status)
+
case StopCacheTracker =>
reply('OK)
exit()
@@ -60,10 +102,16 @@ class CacheTrackerActor extends DaemonActor with Logging {
}
}
+
class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Tracker actor on the master, or remote reference to it on workers
var trackerActor: AbstractActor = null
-
+
+ val registeredRddIds = new HashSet[Int]
+
+ // Stores map results for various splits locally
+ val cache = theCache.newKeySpace()
+
if (isMaster) {
val tracker = new CacheTrackerActor
tracker.start()
@@ -74,10 +122,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
trackerActor = RemoteActor.select(Node(host, port), 'CacheTracker)
}
- val registeredRddIds = new HashSet[Int]
-
- // Stores map results for various splits locally
- val cache = theCache.newKeySpace()
+ // Report the cache being started.
+ trackerActor !? SlaveCacheStarted(
+ System.getProperty("spark.hostname", Utils.localHostName),
+ cache.getCapacity)
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[(Int, Int)]
@@ -92,17 +140,30 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
}
}
-
+
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
(trackerActor !? GetCacheLocations) match {
case h: HashMap[_, _] =>
h.asInstanceOf[HashMap[Int, Array[List[String]]]]
-
- case _ =>
+
+ case _ =>
throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
}
}
+
+ // Get the usage status of slave caches. Each tuple in the returned sequence
+ // is in the form of (host name, capacity, usage).
+ def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = {
+ (trackerActor !? GetCacheStatus) match {
+ case h: Seq[Tuple3[String, Long, Long]] =>
+ h.asInstanceOf[Seq[Tuple3[String, Long, Long]]]
+
+ case _ =>
+ throw new SparkException(
+ "Internal error: CacheTrackerActor did not reply with a Seq[Tuple3[String, Long, Long]")
+ }
+ }
// Gets or computes an RDD split
def getOrCompute[T](rdd: RDD[T], split: Split)(implicit m: ClassManifest[T]): Iterator[T] = {
@@ -138,10 +199,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// TODO: fetch any remote copy of the split that may be available
logInfo("Computing partition " + split)
var array: Array[T] = null
- var putSuccessful: Boolean = false
+ var putRetval: Long = -1L
try {
array = rdd.compute(split).toArray(m)
- putSuccessful = cache.put(rdd.id, split.index, array)
+ putRetval = cache.put(rdd.id, split.index, array)
} finally {
// Tell other threads that we've finished our attempt to load the key (whether or not
// we've actually succeeded to put it in the map)
@@ -150,10 +211,10 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
loading.notifyAll()
}
}
- if (putSuccessful) {
+ if (putRetval >= 0) {
// Tell the master that we added the entry. Don't return until it replies so it can
// properly schedule future tasks that use this RDD.
- trackerActor !? AddedToCache(rdd.id, split.index, host)
+ trackerActor !? AddedToCache(rdd.id, split.index, host, putRetval)
}
return array.iterator
}
diff --git a/core/src/main/scala/spark/DiskSpillingCache.scala b/core/src/main/scala/spark/DiskSpillingCache.scala
index e4d0f991aa..037ed78688 100644
--- a/core/src/main/scala/spark/DiskSpillingCache.scala
+++ b/core/src/main/scala/spark/DiskSpillingCache.scala
@@ -44,7 +44,7 @@ class DiskSpillingCache extends BoundedMemoryCache {
}
}
- override def put(datasetId: Any, partition: Int, value: Any): Boolean = {
+ override def put(datasetId: Any, partition: Int, value: Any): Long = {
var ser = SparkEnv.get.serializer.newInstance()
super.put(datasetId, partition, ser.serialize(value))
}
diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala
index f6964905c7..17dc735d5e 100644
--- a/core/src/main/scala/spark/SerializingCache.scala
+++ b/core/src/main/scala/spark/SerializingCache.scala
@@ -9,7 +9,7 @@ import java.io._
class SerializingCache extends Cache with Logging {
val bmc = new BoundedMemoryCache
- override def put(datasetId: Any, partition: Int, value: Any): Boolean = {
+ override def put(datasetId: Any, partition: Int, value: Any): Long = {
val ser = SparkEnv.get.serializer.newInstance()
bmc.put(datasetId, partition, ser.serialize(value))
}
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala
index c507df928b..cd2386eb83 100644
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ b/core/src/main/scala/spark/SoftReferenceCache.scala
@@ -11,8 +11,8 @@ class SoftReferenceCache extends Cache {
override def get(datasetId: Any, partition: Int): Any =
map.get((datasetId, partition))
- override def put(datasetId: Any, partition: Int, value: Any): Boolean = {
+ override def put(datasetId: Any, partition: Int, value: Any): Long = {
map.put((datasetId, partition), value)
- return true
+ return 0
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 58b5fa6bbd..5aecbdde7d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -174,4 +174,28 @@ object Utils {
throw new IOException("Failed to delete: " + file)
}
}
+
+ /**
+ * Use unit suffixes (Byte, Kilobyte, Megabyte, Gigabyte, Terabyte and
+ * Petabyte) in order to reduce the number of digits to four or less. For
+ * example, 4,000,000 is returned as 4MB.
+ */
+ def sizeWithSuffix(size: Long): String = {
+ val GB = 1L << 30
+ val MB = 1L << 20
+ val KB = 1L << 10
+ val B = 1L
+ val (value, unit) = {
+ if (size >= 2*GB) {
+ (size.asInstanceOf[Double] / GB, "GB")
+ } else if (size >= 2*MB) {
+ (size.asInstanceOf[Double] / MB, "MB")
+ } else if (size >= 2*KB) {
+ (size.asInstanceOf[Double] / KB, "KB")
+ } else {
+ (size.asInstanceOf[Double], "B")
+ }
+ }
+ "%.1f%s".format(value, unit)
+ }
}
diff --git a/core/src/test/scala/spark/Utils.scala b/core/src/test/scala/spark/Utils.scala
new file mode 100644
index 0000000000..b78b638bb1
--- /dev/null
+++ b/core/src/test/scala/spark/Utils.scala
@@ -0,0 +1,18 @@
+package spark
+
+import org.scalatest.FunSuite
+
+
+class UtilsSuite extends FunSuite {
+
+ test("sizeWithSuffix") {
+ assert(Utils.sizeWithSuffix(10) === "10.0B")
+ assert(Utils.sizeWithSuffix(1500) === "1500.0B")
+ assert(Utils.sizeWithSuffix(2000000) === "1953.1KB")
+ assert(Utils.sizeWithSuffix(2097152) === "2.0MB")
+ assert(Utils.sizeWithSuffix(2306867) === "2.2MB")
+ assert(Utils.sizeWithSuffix(5368709120L) === "5.0GB")
+ }
+
+}
+