aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-24 19:14:35 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-24 19:14:35 -0700
commitdd7c5d8e34c743b9766542f0645745de12b0ac26 (patch)
tree16c4863ede2a8c4bc0430593179b6598c37a3abc /src
parentedf86fdb277eaa4992bd0b90b572f3c0e89e5975 (diff)
downloadspark-dd7c5d8e34c743b9766542f0645745de12b0ac26.tar.gz
spark-dd7c5d8e34c743b9766542f0645745de12b0ac26.tar.bz2
spark-dd7c5d8e34c743b9766542f0645745de12b0ac26.zip
Added initial attempt at a BoundedMemoryCache
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/BoundedMemoryCache.scala69
1 files changed, 69 insertions, 0 deletions
diff --git a/src/scala/spark/BoundedMemoryCache.scala b/src/scala/spark/BoundedMemoryCache.scala
new file mode 100644
index 0000000000..19d9bebfe5
--- /dev/null
+++ b/src/scala/spark/BoundedMemoryCache.scala
@@ -0,0 +1,69 @@
+package spark
+
+import java.util.LinkedHashMap
+
+/**
+ * An implementation of Cache that estimates the sizes of its entries and
+ * attempts to limit its total memory usage to a fraction of the JVM heap.
+ * Objects' sizes are estimated using SizeEstimator, which has limitations;
+ * most notably, we will overestimate total memory used if some cache
+ * entries have pointers to a shared object. Nonetheless, this Cache should
+ * work well when most of the space is used by arrays of primitives or of
+ * simple classes.
+ */
+class BoundedMemoryCache extends Cache with Logging {
+ private val maxBytes: Long = getMaxBytes()
+ logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
+
+ private var currentBytes = 0L
+ private val map = new LinkedHashMap[Any, Entry](32, 0.75f, true)
+
+ // An entry in our map; stores a cached object and its size in bytes
+ class Entry(val value: Any, val size: Long) {}
+
+ override def get(key: Any): Any = {
+ synchronized {
+ val entry = map.get(key)
+ if (entry != null) entry.value else null
+ }
+ }
+
+ override def put(key: Any, value: Any) {
+ logInfo("Asked to add key " + key)
+ val startTime = System.currentTimeMillis
+ val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
+ val timeTaken = System.currentTimeMillis - startTime
+ logInfo("Estimated size for key %s is %d".format(key, size))
+ logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
+ synchronized {
+ ensureFreeSpace(size)
+ logInfo("Adding key " + key)
+ map.put(key, new Entry(value, size))
+ currentBytes += size
+ logInfo("Number of entries is now " + map.size)
+ }
+ }
+
+ private def getMaxBytes(): Long = {
+ val memoryFractionToUse = System.getProperty(
+ "spark.boundedMemoryCache.memoryFraction", "0.75").toDouble
+ (Runtime.getRuntime.totalMemory * memoryFractionToUse).toLong
+ }
+
+ /**
+ * Remove least recently used entries from the map until at least space
+ * bytes are free. Assumes that a lock is held on the BoundedMemoryCache.
+ */
+ private def ensureFreeSpace(space: Long) {
+ logInfo("ensureFreeSpace(%d) called with curBytes=%d, maxBytes=%d".format(
+ space, currentBytes, maxBytes))
+ val iter = map.entrySet.iterator
+ while (maxBytes - currentBytes < space && iter.hasNext) {
+ val mapEntry = iter.next()
+ logInfo("Dropping key %s of size %d to make space".format(
+ mapEntry.getKey, mapEntry.getValue.size))
+ currentBytes -= mapEntry.getValue.size
+ iter.remove()
+ }
+ }
+}