diff options
-rw-r--r-- | src/scala/spark/Broadcast.scala | 7 | ||||
-rw-r--r-- | src/scala/spark/Cache.scala | 63 | ||||
-rw-r--r-- | src/scala/spark/Executor.scala | 5 | ||||
-rw-r--r-- | src/scala/spark/RDD.scala | 5 | ||||
-rw-r--r-- | src/scala/spark/SoftReferenceCache.scala | 13 | ||||
-rw-r--r-- | src/scala/spark/SparkContext.scala | 3 | ||||
-rw-r--r-- | src/scala/spark/WeakReferenceCache.scala | 14 |
7 files changed, 98 insertions, 12 deletions
diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index df7ab3f19f..5089dca82e 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -4,8 +4,6 @@ import java.io._ import java.net._ import java.util.{UUID, PriorityQueue, Comparator} -import com.google.common.collect.MapMaker - import java.util.concurrent.{Executors, ExecutorService} import scala.actors.Actor @@ -232,8 +230,7 @@ private object Broadcast { } private object BroadcastCS extends Logging { - val values = new MapMaker ().softValues ().makeMap[UUID, Any] - // val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any] + val values = Cache.newKeySpace() // private var valueToPort = Map[UUID, Int] () @@ -739,7 +736,7 @@ private object BroadcastCS extends Logging { } private object BroadcastCH extends Logging { - val values = new MapMaker ().softValues ().makeMap[UUID, Any] + val values = Cache.newKeySpace() private var initialized = false diff --git a/src/scala/spark/Cache.scala b/src/scala/spark/Cache.scala new file mode 100644 index 0000000000..9887520758 --- /dev/null +++ b/src/scala/spark/Cache.scala @@ -0,0 +1,63 @@ +package spark + +import java.util.concurrent.atomic.AtomicLong + + +/** + * An interface for caches in Spark, to allow for multiple implementations. + * Caches are used to store both partitions of cached RDDs and broadcast + * variables on Spark executors. + * + * A single Cache instance gets created on each machine and is shared by all + * caches (i.e. both the RDD split cache and the broadcast variable cache), + * to enable global replacement policies. However, because these several + * independent modules all perform caching, it is important to give them + * separate key namespaces, so that an RDD and a broadcast variable (for + * example) do not use the same key. For this purpose, Cache has the + * notion of KeySpaces. Each client module must first ask for a KeySpace, + * and then call get() and put() on that space using its own keys. + * This abstract class handles the creation of key spaces, so that subclasses + * need only deal with keys that are unique across modules. + */ +abstract class Cache { + private val nextKeySpaceId = new AtomicLong(0) + private def newKeySpaceId() = nextKeySpaceId.getAndIncrement() + + def newKeySpace() = new KeySpace(this, newKeySpaceId()) + + def get(key: Any): Any + def put(key: Any, value: Any): Unit +} + + +/** + * A key namespace in a Cache. + */ +class KeySpace(cache: Cache, id: Long) { + def get(key: Any): Any = cache.get((id, key)) + def put(key: Any, value: Any): Unit = cache.put((id, key), value) +} + + +/** + * The Cache object maintains a global Cache instance, of the type specified + * by the spark.cache.class property. + */ +object Cache { + private var instance: Cache = null + + def initialize() { + val cacheClass = System.getProperty("spark.cache.class", + "spark.SoftReferenceCache") + instance = Class.forName(cacheClass).newInstance().asInstanceOf[Cache] + } + + def getInstance(): Cache = { + if (instance == null) { + throw new SparkException("Cache.getInstance called before initialize") + } + instance + } + + def newKeySpace(): KeySpace = getInstance().newKeySpace() +} diff --git a/src/scala/spark/Executor.scala b/src/scala/spark/Executor.scala index e47d3757b6..b4d023b428 100644 --- a/src/scala/spark/Executor.scala +++ b/src/scala/spark/Executor.scala @@ -21,8 +21,9 @@ class Executor extends mesos.Executor with Logging { val props = Utils.deserialize[Array[(String, String)]](args.getData) for ((key, value) <- props) System.setProperty(key, value) - - // Initialize broadcast system (uses some properties read above) + + // Initialize cache and broadcast system (uses some properties read above) + Cache.initialize() Broadcast.initialize(false) // Create our ClassLoader (using spark properties) and set it on this thread diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index 9bfc8c9c62..3519114306 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -1,7 +1,6 @@ package spark import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.ConcurrentHashMap import java.util.HashSet import java.util.Random @@ -11,8 +10,6 @@ import scala.collection.mutable.HashMap import mesos._ -import com.google.common.collect.MapMaker - @serializable abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { @@ -267,7 +264,7 @@ private object CachedRDD { def newId() = nextId.getAndIncrement() // Stores map results for various splits locally (on workers) - val cache = new MapMaker().softValues().makeMap[String, AnyRef]() + val cache = Cache.newKeySpace() // Remembers which splits are currently being loaded (on workers) val loading = new HashSet[String] diff --git a/src/scala/spark/SoftReferenceCache.scala b/src/scala/spark/SoftReferenceCache.scala new file mode 100644 index 0000000000..e84aa57efa --- /dev/null +++ b/src/scala/spark/SoftReferenceCache.scala @@ -0,0 +1,13 @@ +package spark + +import com.google.common.collect.MapMaker + +/** + * An implementation of Cache that uses soft references. + */ +class SoftReferenceCache extends Cache { + val map = new MapMaker().softValues().makeMap[Any, Any]() + + override def get(key: Any): Any = map.get(key) + override def put(key: Any, value: Any) = map.put(key, value) +} diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index e85b26e238..24fe0e9bbb 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -30,8 +30,9 @@ extends Logging { private val isLocal = scheduler.isInstanceOf[LocalScheduler] - // Start the scheduler and the broadcast system + // Start the scheduler, the cache and the broadcast system scheduler.start() + Cache.initialize() Broadcast.initialize(true) // Methods for creating RDDs diff --git a/src/scala/spark/WeakReferenceCache.scala b/src/scala/spark/WeakReferenceCache.scala new file mode 100644 index 0000000000..ddca065454 --- /dev/null +++ b/src/scala/spark/WeakReferenceCache.scala @@ -0,0 +1,14 @@ +package spark + +import com.google.common.collect.MapMaker + +/** + * An implementation of Cache that uses weak references. + */ +class WeakReferenceCache extends Cache { + val map = new MapMaker().weakValues().makeMap[Any, Any]() + + override def get(key: Any): Any = map.get(key) + override def put(key: Any, value: Any) = map.put(key, value) +} + |