aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-10-23 17:54:25 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-10-23 17:54:25 -0700
commita481e237616906120921f1c42d40112db4d5fb0f (patch)
tree46c2719fa2d6f1cb527fca29677459b04e818eda /src
parent93a200bc7efd1a867d9cc3759e82327cfdf325b0 (diff)
downloadspark-a481e237616906120921f1c42d40112db4d5fb0f.tar.gz
spark-a481e237616906120921f1c42d40112db4d5fb0f.tar.bz2
spark-a481e237616906120921f1c42d40112db4d5fb0f.zip
Made caching pluggable and added soft reference and weak reference caches.
Diffstat (limited to 'src')
-rw-r--r--src/scala/spark/Broadcast.scala7
-rw-r--r--src/scala/spark/Cache.scala63
-rw-r--r--src/scala/spark/Executor.scala5
-rw-r--r--src/scala/spark/RDD.scala5
-rw-r--r--src/scala/spark/SoftReferenceCache.scala13
-rw-r--r--src/scala/spark/SparkContext.scala3
-rw-r--r--src/scala/spark/WeakReferenceCache.scala14
7 files changed, 98 insertions, 12 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala
index df7ab3f19f..5089dca82e 100644
--- a/src/scala/spark/Broadcast.scala
+++ b/src/scala/spark/Broadcast.scala
@@ -4,8 +4,6 @@ import java.io._
import java.net._
import java.util.{UUID, PriorityQueue, Comparator}
-import com.google.common.collect.MapMaker
-
import java.util.concurrent.{Executors, ExecutorService}
import scala.actors.Actor
@@ -232,8 +230,7 @@ private object Broadcast {
}
private object BroadcastCS extends Logging {
- val values = new MapMaker ().softValues ().makeMap[UUID, Any]
- // val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any]
+ val values = Cache.newKeySpace()
// private var valueToPort = Map[UUID, Int] ()
@@ -739,7 +736,7 @@ private object BroadcastCS extends Logging {
}
private object BroadcastCH extends Logging {
- val values = new MapMaker ().softValues ().makeMap[UUID, Any]
+ val values = Cache.newKeySpace()
private var initialized = false
diff --git a/src/scala/spark/Cache.scala b/src/scala/spark/Cache.scala
new file mode 100644
index 0000000000..9887520758
--- /dev/null
+++ b/src/scala/spark/Cache.scala
@@ -0,0 +1,63 @@
+package spark
+
+import java.util.concurrent.atomic.AtomicLong
+
+
+/**
+ * An interface for caches in Spark, to allow for multiple implementations.
+ * Caches are used to store both partitions of cached RDDs and broadcast
+ * variables on Spark executors.
+ *
+ * A single Cache instance gets created on each machine and is shared by all
+ * caches (i.e. both the RDD split cache and the broadcast variable cache),
+ * to enable global replacement policies. However, because these several
+ * independent modules all perform caching, it is important to give them
+ * separate key namespaces, so that an RDD and a broadcast variable (for
+ * example) do not use the same key. For this purpose, Cache has the
+ * notion of KeySpaces. Each client module must first ask for a KeySpace,
+ * and then call get() and put() on that space using its own keys.
+ * This abstract class handles the creation of key spaces, so that subclasses
+ * need only deal with keys that are unique across modules.
+ */
+abstract class Cache {
+ private val nextKeySpaceId = new AtomicLong(0)
+ private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
+
+ def newKeySpace() = new KeySpace(this, newKeySpaceId())
+
+ def get(key: Any): Any
+ def put(key: Any, value: Any): Unit
+}
+
+
+/**
+ * A key namespace in a Cache.
+ */
+class KeySpace(cache: Cache, id: Long) {
+ def get(key: Any): Any = cache.get((id, key))
+ def put(key: Any, value: Any): Unit = cache.put((id, key), value)
+}
+
+
+/**
+ * The Cache object maintains a global Cache instance, of the type specified
+ * by the spark.cache.class property.
+ */
+object Cache {
+ private var instance: Cache = null
+
+ def initialize() {
+ val cacheClass = System.getProperty("spark.cache.class",
+ "spark.SoftReferenceCache")
+ instance = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
+ }
+
+ def getInstance(): Cache = {
+ if (instance == null) {
+ throw new SparkException("Cache.getInstance called before initialize")
+ }
+ instance
+ }
+
+ def newKeySpace(): KeySpace = getInstance().newKeySpace()
+}
diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala
index e47d3757b6..b4d023b428 100644
--- a/src/scala/spark/Executor.scala
+++ b/src/scala/spark/Executor.scala
@@ -21,8 +21,9 @@ class Executor extends mesos.Executor with Logging {
val props = Utils.deserialize[Array[(String, String)]](args.getData)
for ((key, value) <- props)
System.setProperty(key, value)
-
- // Initialize broadcast system (uses some properties read above)
+
+ // Initialize cache and broadcast system (uses some properties read above)
+ Cache.initialize()
Broadcast.initialize(false)
// Create our ClassLoader (using spark properties) and set it on this thread
diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala
index 9bfc8c9c62..3519114306 100644
--- a/src/scala/spark/RDD.scala
+++ b/src/scala/spark/RDD.scala
@@ -1,7 +1,6 @@
package spark
import java.util.concurrent.atomic.AtomicLong
-import java.util.concurrent.ConcurrentHashMap
import java.util.HashSet
import java.util.Random
@@ -11,8 +10,6 @@ import scala.collection.mutable.HashMap
import mesos._
-import com.google.common.collect.MapMaker
-
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
@@ -267,7 +264,7 @@ private object CachedRDD {
def newId() = nextId.getAndIncrement()
// Stores map results for various splits locally (on workers)
- val cache = new MapMaker().softValues().makeMap[String, AnyRef]()
+ val cache = Cache.newKeySpace()
// Remembers which splits are currently being loaded (on workers)
val loading = new HashSet[String]
diff --git a/src/scala/spark/SoftReferenceCache.scala b/src/scala/spark/SoftReferenceCache.scala
new file mode 100644
index 0000000000..e84aa57efa
--- /dev/null
+++ b/src/scala/spark/SoftReferenceCache.scala
@@ -0,0 +1,13 @@
+package spark
+
+import com.google.common.collect.MapMaker
+
+/**
+ * An implementation of Cache that uses soft references.
+ */
+class SoftReferenceCache extends Cache {
+ val map = new MapMaker().softValues().makeMap[Any, Any]()
+
+ override def get(key: Any): Any = map.get(key)
+ override def put(key: Any, value: Any) = map.put(key, value)
+}
diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala
index e85b26e238..24fe0e9bbb 100644
--- a/src/scala/spark/SparkContext.scala
+++ b/src/scala/spark/SparkContext.scala
@@ -30,8 +30,9 @@ extends Logging {
private val isLocal = scheduler.isInstanceOf[LocalScheduler]
- // Start the scheduler and the broadcast system
+ // Start the scheduler, the cache and the broadcast system
scheduler.start()
+ Cache.initialize()
Broadcast.initialize(true)
// Methods for creating RDDs
diff --git a/src/scala/spark/WeakReferenceCache.scala b/src/scala/spark/WeakReferenceCache.scala
new file mode 100644
index 0000000000..ddca065454
--- /dev/null
+++ b/src/scala/spark/WeakReferenceCache.scala
@@ -0,0 +1,14 @@
+package spark
+
+import com.google.common.collect.MapMaker
+
+/**
+ * An implementation of Cache that uses weak references.
+ */
+class WeakReferenceCache extends Cache {
+ val map = new MapMaker().weakValues().makeMap[Any, Any]()
+
+ override def get(key: Any): Any = map.get(key)
+ override def put(key: Any, value: Any) = map.put(key, value)
+}
+