aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 13:54:57 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-30 13:54:57 -0700
commit400221f851cfda884c47b9cae55f347438ae007d (patch)
treeb32ecb73f0e06c4e64db351288d11b275aecf7d0
parentf471c82558347216985a8b7015309c4cce099cc4 (diff)
parentcf429699e1073812c29308d24270768640063bf8 (diff)
downloadspark-400221f851cfda884c47b9cae55f347438ae007d.tar.gz
spark-400221f851cfda884c47b9cae55f347438ae007d.tar.bz2
spark-400221f851cfda884c47b9cae55f347438ae007d.zip
Merge branch 'dev' of git://github.com/tdas/spark into dev
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala42
-rw-r--r--core/src/main/scala/spark/BoundedMemoryCache.scala10
-rw-r--r--core/src/main/scala/spark/RDD.scala19
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockStore.scala83
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala7
6 files changed, 130 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
new file mode 100644
index 0000000000..ea009f0f4f
--- /dev/null
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -0,0 +1,42 @@
+package spark
+
+import scala.collection.mutable.HashMap
+
+class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
+ val index = idx
+}
+
+
+class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+
+ @transient
+ val splits_ = (0 until blockIds.size).map(i => {
+ new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
+ }).toArray
+
+ @transient
+ lazy val locations_ = {
+ val blockManager = SparkEnv.get.blockManager
+ /*val locations = blockIds.map(id => blockManager.getLocations(id))*/
+ val locations = blockManager.getLocations(blockIds)
+ HashMap(blockIds.zip(locations):_*)
+ }
+
+ override def splits = splits_
+
+ override def compute(split: Split): Iterator[T] = {
+ val blockManager = SparkEnv.get.blockManager
+ val blockId = split.asInstanceOf[BlockRDDSplit].blockId
+ blockManager.get(blockId) match {
+ case Some(block) => block.asInstanceOf[Iterator[T]]
+ case None =>
+ throw new Exception("Could not compute split, block " + blockId + " not found")
+ }
+ }
+
+ override def preferredLocations(split: Split) =
+ locations_(split.asInstanceOf[BlockRDDSplit].blockId)
+
+ override val dependencies: List[Dependency[_]] = Nil
+}
+
diff --git a/core/src/main/scala/spark/BoundedMemoryCache.scala b/core/src/main/scala/spark/BoundedMemoryCache.scala
index fa5dcee7bb..6fe0b94297 100644
--- a/core/src/main/scala/spark/BoundedMemoryCache.scala
+++ b/core/src/main/scala/spark/BoundedMemoryCache.scala
@@ -91,7 +91,15 @@ class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
// TODO: remove BoundedMemoryCache
- SparkEnv.get.cacheTracker.dropEntry(datasetId.asInstanceOf[(Int, Int)]._2, partition)
+
+ val (keySpaceId, innerDatasetId) = datasetId.asInstanceOf[(Any, Any)]
+ innerDatasetId match {
+ case rddId: Int =>
+ SparkEnv.get.cacheTracker.dropEntry(rddId, partition)
+ case broadcastUUID: java.util.UUID =>
+ // TODO: Maybe something should be done if the broadcasted variable falls out of cache
+ case _ =>
+ }
}
}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 429e9c936f..8a79e85cf9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -94,6 +94,25 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def getStorageLevel = storageLevel
+ def checkpoint(level: StorageLevel = StorageLevel.DISK_AND_MEMORY_DESER): RDD[T] = {
+ if (!level.useDisk && level.replication < 2) {
+ throw new Exception("Cannot checkpoint without using disk or replication (level requested was " + level + ")")
+ }
+
+ // This is a hack. Ideally this should re-use the code used by the CacheTracker
+ // to generate the key.
+ def getSplitKey(split: Split) = "rdd:%d:%d".format(this.id, split.index)
+
+ persist(level)
+ sc.runJob(this, (iter: Iterator[T]) => {} )
+
+ val p = this.partitioner
+
+ new BlockRDD[T](sc, splits.map(getSplitKey).toArray) {
+ override val partitioner = p
+ }
+ }
+
// Read this RDD; will read from cache if applicable, or otherwise compute
final def iterator(split: Split): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 15131960d6..9faa0e62f2 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -5,7 +5,7 @@ import java.nio._
import java.nio.channels.FileChannel.MapMode
import java.util.{HashMap => JHashMap}
import java.util.LinkedHashMap
-import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
import java.util.Collections
import scala.actors._
@@ -74,7 +74,7 @@ class BlockManager(maxMemory: Long, val serializer: Serializer) extends Logging
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
- private val blockInfo = Collections.synchronizedMap(new JHashMap[String, BlockInfo])
+ private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
private val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private val diskStore: BlockStore = new DiskStore(this,
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 8672a5376e..17f4f51aa8 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,16 +1,15 @@
package spark.storage
import spark.{Utils, Logging, Serializer, SizeEstimator}
-
import scala.collection.mutable.ArrayBuffer
-
import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{UUID, LinkedHashMap}
import java.util.concurrent.Executors
-
+import java.util.concurrent.ConcurrentHashMap
import it.unimi.dsi.fastutil.io._
+import java.util.concurrent.ArrayBlockingQueue
/**
* Abstract class to store blocks
@@ -41,13 +40,29 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
- class Entry(var value: Any, val size: Long, val deserialized: Boolean)
+ case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
private val memoryStore = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
- private val blockDropper = Executors.newSingleThreadExecutor()
-
+ //private val blockDropper = Executors.newSingleThreadExecutor()
+ private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
+ private val blockDropper = new Thread("memory store - block dropper") {
+ override def run() {
+ try{
+ while (true) {
+ val blockId = blocksToDrop.take()
+ logDebug("Block " + blockId + " ready to be dropped")
+ blockManager.dropFromMemory(blockId)
+ }
+ } catch {
+ case ie: InterruptedException =>
+ logInfo("Shutting down block dropper")
+ }
+ }
+ }
+ blockDropper.start()
+
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
bytes.rewind()
@@ -124,41 +139,45 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
memoryStore.synchronized {
memoryStore.clear()
}
- blockDropper.shutdown()
+ //blockDropper.shutdown()
+ blockDropper.interrupt()
logInfo("MemoryStore cleared")
}
- private def drop(blockId: String) {
- blockDropper.submit(new Runnable() {
- def run() {
- blockManager.dropFromMemory(blockId)
- }
- })
- }
-
private def ensureFreeSpace(space: Long) {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
- val droppedBlockIds = new ArrayBuffer[String]()
- var droppedMemory = 0L
-
- memoryStore.synchronized {
- val iter = memoryStore.entrySet().iterator()
- while (maxMemory - (currentMemory - droppedMemory) < space && iter.hasNext) {
- val pair = iter.next()
- val blockId = pair.getKey
- droppedBlockIds += blockId
- droppedMemory += pair.getValue.size
- logDebug("Decided to drop " + blockId)
+ if (maxMemory - currentMemory < space) {
+
+ val selectedBlocks = new ArrayBuffer[String]()
+ var selectedMemory = 0L
+
+ memoryStore.synchronized {
+ val iter = memoryStore.entrySet().iterator()
+ while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
+ val pair = iter.next()
+ val blockId = pair.getKey
+ val entry = pair.getValue()
+ if (!entry.dropPending) {
+ selectedBlocks += blockId
+ entry.dropPending = true
+ }
+ selectedMemory += pair.getValue.size
+ logDebug("Block " + blockId + " selected for dropping")
+ }
+ }
+
+ logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " +
+ blocksToDrop.size + " blocks pending")
+ var i = 0
+ while (i < selectedBlocks.size) {
+ blocksToDrop.add(selectedBlocks(i))
+ i += 1
}
- }
-
- for (blockId <- droppedBlockIds) {
- drop(blockId)
+ selectedBlocks.clear()
}
- droppedBlockIds.clear()
- }
+ }
}
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index 4a79c086e9..20638bba92 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -59,4 +59,11 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
}
+
+ test("checkpointing") {
+ val sc = new SparkContext("local", "test")
+ val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint()
+ assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
+ sc.stop()
+ }
}