From 22684653a55d00dce448aaf1619a75879ccb5af0 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 2 Oct 2012 12:01:32 -0700 Subject: Revert "Place Spray repo ahead of Cloudera in Maven search path" This reverts commit 42e0a68082327c78dbd0fd313145124d9b8a9d98. --- .../main/scala/spark/storage/BlockManager.scala | 222 ++++++++++----------- core/src/main/scala/spark/storage/BlockStore.scala | 2 - core/src/main/scala/spark/storage/DiskStore.scala | 6 +- .../src/main/scala/spark/storage/MemoryStore.scala | 170 +++++++--------- .../scala/spark/storage/BlockManagerSuite.scala | 6 +- project/SparkBuild.scala | 4 +- 6 files changed, 180 insertions(+), 230 deletions(-) diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 3b9cd8b5fc..37d5862575 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -59,31 +59,15 @@ class BlockLocker(numLockers: Int) { class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { - class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) { - def waitForReady() { - if (pending) { - synchronized { - while (pending) this.wait() - } - } - } - - def markReady() { - pending = false - synchronized { - this.notifyAll() - } - } - } + case class BlockInfo(level: StorageLevel, tellMaster: Boolean) private val NUM_LOCKS = 337 private val locker = new BlockLocker(NUM_LOCKS) private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() - private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: BlockStore = - new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + private[storage] val diskStore: BlockStore = new DiskStore(this, + System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext @@ -95,6 +79,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m var cacheTracker: CacheTracker = null val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties + val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean initialize() @@ -125,32 +110,45 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Tell the master about the current storage status of a block. This will send a heartbeat - * message reflecting the current status, *not* the desired storage level in its block info. - * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. + * Change the storage level for a local block in the block info meta data, and + * tell the master if necessary. Note that this is only a meta data change and + * does NOT actually change the storage of the block. If the new level is + * invalid, then block info (if exists) will be silently removed. */ - def reportBlockStatus(blockId: String) { - locker.getLock(blockId).synchronized { - val curLevel = blockInfo.get(blockId) match { - case null => - StorageLevel.NONE - case info => - info.level match { - case null => - StorageLevel.NONE - case level => - val inMem = level.useMemory && memoryStore.contains(blockId) - val onDisk = level.useDisk && diskStore.contains(blockId) - new StorageLevel(onDisk, inMem, level.deserialized, level.replication) - } - } + private[spark] def setLevelAndTellMaster( + blockId: String, level: StorageLevel, tellMaster: Boolean = true) { + + if (level == null) { + throw new IllegalArgumentException("Storage level is null") + } + + // If there was earlier info about the block, then use earlier tellMaster + val oldInfo = blockInfo.get(blockId) + val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster + if (oldInfo != null && oldInfo.tellMaster != tellMaster) { + logWarning("Ignoring tellMaster setting as it is different from earlier setting") + } + + // If level is valid, store the block info, else remove the block info + if (level.isValid) { + blockInfo.put(blockId, new BlockInfo(level, newTellMaster)) + logDebug("Info for block " + blockId + " updated with new level as " + level) + } else { + blockInfo.remove(blockId) + logDebug("Info for block " + blockId + " removed as new level is null or invalid") + } + + // Tell master if necessary + if (newTellMaster) { master.mustHeartBeat(HeartBeat( blockManagerId, blockId, - curLevel, - if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L, - if (curLevel.useDisk) diskStore.getSize(blockId) else 0L)) + level, + if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0, + if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0)) logDebug("Told master about block " + blockId) + } else { + logDebug("Did not tell master about block " + blockId) } } @@ -182,21 +180,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { - info.waitForReady() // In case the block is still being put() by another thread - val level = info.level - logDebug("Level for block " + blockId + " is " + level) + // Check storage level of block + val level = getLevel(blockId) + if (level != null) { + logDebug("Level for block " + blockId + " is " + level + " on local machine") // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") memoryStore.getValues(blockId) match { - case Some(iterator) => + case Some(iterator) => { logDebug("Block " + blockId + " found in memory") return Some(iterator) - case None => + } + case None => { logDebug("Block " + blockId + " not found in memory") + } } } @@ -204,12 +203,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level.useDisk) { logDebug("Getting block " + blockId + " from disk") diskStore.getValues(blockId) match { - case Some(iterator) => + case Some(iterator) => { logDebug("Block " + blockId + " found in disk") return Some(iterator) - case None => + } + case None => { throw new Exception("Block " + blockId + " not found on disk, though it should be") return None + } } } } else { @@ -225,21 +226,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m def getLocalBytes(blockId: String): Option[ByteBuffer] = { logDebug("Getting local block " + blockId + " as bytes") locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - if (info != null) { - info.waitForReady() // In case the block is still being put() by another thread - val level = info.level + // Check storage level of block + val level = getLevel(blockId) + if (level != null) { logDebug("Level for block " + blockId + " is " + level + " on local machine") // Look for the block in memory if (level.useMemory) { logDebug("Getting block " + blockId + " from memory") memoryStore.getBytes(blockId) match { - case Some(bytes) => + case Some(bytes) => { logDebug("Block " + blockId + " found in memory") return Some(bytes) - case None => + } + case None => { logDebug("Block " + blockId + " not found in memory") + } } } @@ -247,12 +249,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (level.useDisk) { logDebug("Getting block " + blockId + " from disk") diskStore.getBytes(blockId) match { - case Some(bytes) => + case Some(bytes) => { logDebug("Block " + blockId + " found in disk") return Some(bytes) - case None => + } + case None => { throw new Exception("Block " + blockId + " not found on disk, though it should be") return None + } } } } else { @@ -427,17 +431,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.containsKey(blockId)) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return - } - - // Remember the block's storage level so that we can correctly drop it to disk if it needs - // to be dropped right after it got put into memory. Note, however, that other threads will - // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - blockInfo.put(blockId, myInfo) - val startTimeMs = System.currentTimeMillis var bytes: ByteBuffer = null @@ -451,15 +444,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") - if (level.useMemory) { - // Save it just to memory first, even if it also has useDisk set to true; we will later - // drop it to disk if the memory store can't hold it. + // Check and warn if block with same id already exists + if (getLevel(blockId) != null) { + logWarning("Block " + blockId + " already exists in local machine") + return + } + + if (level.useMemory && level.useDisk) { + // If saving to both memory and disk, then serialize only once + memoryStore.putValues(blockId, values, level, true) match { + case Left(newValues) => + diskStore.putValues(blockId, newValues, level, true) match { + case Right(newBytes) => bytes = newBytes + case _ => throw new Exception("Unexpected return value") + } + case Right(newBytes) => + bytes = newBytes + diskStore.putBytes(blockId, newBytes, level) + } + } else if (level.useMemory) { + // If only save to memory memoryStore.putValues(blockId, values, level, true) match { case Right(newBytes) => bytes = newBytes case Left(newIterator) => valuesAfterPut = newIterator } } else { - // Save directly to disk. + // If only save to disk val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them diskStore.putValues(blockId, values, level, askForBytes) match { case Right(newBytes) => bytes = newBytes @@ -467,12 +477,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } } - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - myInfo.markReady() - if (tellMaster) { - reportBlockStatus(blockId) - } + // Store the storage level + setLevelAndTellMaster(blockId, level, tellMaster) } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) @@ -515,17 +521,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.containsKey(blockId)) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return - } - - // Remember the block's storage level so that we can correctly drop it to disk if it needs - // to be dropped right after it got put into memory. Note, however, that other threads will - // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - blockInfo.put(blockId, myInfo) - val startTimeMs = System.currentTimeMillis // Initiate the replication before storing it locally. This is faster as @@ -542,22 +537,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m locker.getLock(blockId).synchronized { logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") + if (getLevel(blockId) != null) { + logWarning("Block " + blockId + " already exists") + return + } if (level.useMemory) { - // Store it only in memory at first, even if useDisk is also set to true bytes.rewind() memoryStore.putBytes(blockId, bytes, level) - } else { + } + if (level.useDisk) { bytes.rewind() diskStore.putBytes(blockId, bytes, level) } - // Now that the block is in either the memory or disk store, let other threads read it, - // and tell the master about it. - myInfo.markReady() - if (tellMaster) { - reportBlockStatus(blockId) - } + // Store the storage level + setLevelAndTellMaster(blockId, level, tellMaster) } // TODO: This code will be removed when CacheTracker is gone. @@ -631,31 +626,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory - * store reaches its limit and needs to free up space. + * Drop block from memory (called when memory store has reached it limit) */ - def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) { - logInfo("Dropping block " + blockId + " from memory") + def dropFromMemory(blockId: String) { locker.getLock(blockId).synchronized { - val info = blockInfo.get(blockId) - val level = info.level - if (level.useDisk && !diskStore.contains(blockId)) { - logInfo("Writing block " + blockId + " to disk") - data match { - case Left(iterator) => - diskStore.putValues(blockId, iterator, level, false) - case Right(bytes) => - diskStore.putBytes(blockId, bytes, level) - } + val level = getLevel(blockId) + if (level == null) { + logWarning("Block " + blockId + " cannot be removed from memory as it does not exist") + return } - memoryStore.remove(blockId) - if (info.tellMaster) { - reportBlockStatus(blockId) - } - if (!level.useDisk) { - // The block is completely gone from this node; forget it so we can put() it again later. - blockInfo.remove(blockId) + if (!level.useMemory) { + logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory") + return } + memoryStore.remove(blockId) + val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication) + setLevelAndTellMaster(blockId, newLevel) } } diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala index ff482ff66b..5f123aca78 100644 --- a/core/src/main/scala/spark/storage/BlockStore.scala +++ b/core/src/main/scala/spark/storage/BlockStore.scala @@ -31,7 +31,5 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging { def remove(blockId: String) - def contains(blockId: String): Boolean - def clear() { } } diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index d0c592ccb1..d9965f4306 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) addShutdownHook() override def getSize(blockId: String): Long = { - getFile(blockId).length() + getFile(blockId).length } override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { @@ -93,10 +93,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } } - override def contains(blockId: String): Boolean = { - getFile(blockId).exists() - } - private def createFile(blockId: String): File = { val file = getFile(blockId) if (file.exists()) { diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala index 74ef326038..ea6f3c4fcc 100644 --- a/core/src/main/scala/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/spark/storage/MemoryStore.scala @@ -18,12 +18,29 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private var currentMemory = 0L + //private val blockDropper = Executors.newSingleThreadExecutor() + private val blocksToDrop = new ArrayBlockingQueue[String](10000, true) + private val blockDropper = new Thread("memory store - block dropper") { + override def run() { + try { + while (true) { + val blockId = blocksToDrop.take() + logDebug("Block " + blockId + " ready to be dropped") + blockManager.dropFromMemory(blockId) + } + } catch { + case ie: InterruptedException => + logInfo("Shutting down block dropper") + } + } + } + blockDropper.start() logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) def freeMemory: Long = maxMemory - currentMemory override def getSize(blockId: String): Long = { - synchronized { + entries.synchronized { entries.get(blockId).size } } @@ -35,12 +52,19 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - tryToPut(blockId, elements, sizeEstimate, true) + ensureFreeSpace(sizeEstimate) + val entry = new Entry(elements, sizeEstimate, true) + entries.synchronized { entries.put(blockId, entry) } + currentMemory += sizeEstimate + logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( + blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) } else { val entry = new Entry(bytes, bytes.limit, false) - ensureFreeSpace(blockId, bytes.limit) - synchronized { entries.put(blockId, entry) } - tryToPut(blockId, bytes, bytes.limit, false) + ensureFreeSpace(bytes.limit) + entries.synchronized { entries.put(blockId, entry) } + currentMemory += bytes.limit + logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( + blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) } } @@ -55,17 +79,27 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) val elements = new ArrayBuffer[Any] elements ++= values val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) - tryToPut(blockId, elements, sizeEstimate, true) + ensureFreeSpace(sizeEstimate) + val entry = new Entry(elements, sizeEstimate, true) + entries.synchronized { entries.put(blockId, entry) } + currentMemory += sizeEstimate + logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( + blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory))) Left(elements.iterator) } else { val bytes = blockManager.dataSerialize(values) - tryToPut(blockId, bytes, bytes.limit, false) + ensureFreeSpace(bytes.limit) + val entry = new Entry(bytes, bytes.limit, false) + entries.synchronized { entries.put(blockId, entry) } + currentMemory += bytes.limit + logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format( + blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory))) Right(bytes) } } override def getBytes(blockId: String): Option[ByteBuffer] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -78,7 +112,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def getValues(blockId: String): Option[Iterator[Any]] = { - val entry = synchronized { + val entry = entries.synchronized { entries.get(blockId) } if (entry == null) { @@ -92,7 +126,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } override def remove(blockId: String) { - synchronized { + entries.synchronized { val entry = entries.get(blockId) if (entry != null) { entries.remove(blockId) @@ -100,118 +134,54 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) } else { - logWarning("Block " + blockId + " could not be removed as it does not exist") + logWarning("Block " + blockId + " could not be removed as it doesnt exist") } } } override def clear() { - synchronized { + entries.synchronized { entries.clear() } + blockDropper.interrupt() logInfo("MemoryStore cleared") } - /** - * Return the RDD ID that a given block ID is from, or null if it is not an RDD block. - */ - private def getRddId(blockId: String): String = { - if (blockId.startsWith("rdd_")) { - blockId.split('_')(1) - } else { - null - } - } - - /** - * Try to put in a set of values, if we can free up enough space. The value should either be - * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) - * size must also be passed by the caller. - */ - private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = { - synchronized { - if (ensureFreeSpace(blockId, size)) { - val entry = new Entry(value, size, deserialized) - entries.put(blockId, entry) - currentMemory += size - if (deserialized) { - logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( - blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory))) - } else { - logInfo("Block %s stored as bytes to memory (size %s, free %s)".format( - blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory))) - } - true - } else { - // Tell the block manager that we couldn't put it in memory so that it can drop it to - // disk if the block allows disk storage. - val data = if (deserialized) { - Left(value.asInstanceOf[ArrayBuffer[Any]].iterator) - } else { - Right(value.asInstanceOf[ByteBuffer].duplicate()) - } - blockManager.dropFromMemory(blockId, data) - false - } - } - } - - /** - * Tries to free up a given amount of space to store a particular block, but can fail and return - * false if either the block is bigger than our memory or it would require replacing another - * block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that - * don't fit into memory that we want to avoid). - * - * Assumes that a lock on entries is held by the caller. - */ - private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = { + // TODO: This should be able to return false if the space is larger than our total memory, + // or if adding this block would require evicting another one from the same RDD + private def ensureFreeSpace(space: Long) { logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( space, currentMemory, maxMemory)) - if (space > maxMemory) { - logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit") - return false - } - if (maxMemory - currentMemory < space) { - val rddToAdd = getRddId(blockIdToAdd) + val selectedBlocks = new ArrayBuffer[String]() var selectedMemory = 0L - val iterator = entries.entrySet().iterator() - while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { - val pair = iterator.next() - val blockId = pair.getKey - if (rddToAdd != null && rddToAdd == getRddId(blockId)) { - logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + - "block from the same RDD") - return false + entries.synchronized { + val iter = entries.entrySet().iterator() + while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) { + val pair = iter.next() + val blockId = pair.getKey + val entry = pair.getValue + if (!entry.dropPending) { + selectedBlocks += blockId + entry.dropPending = true + } + selectedMemory += pair.getValue.size + logInfo("Block " + blockId + " selected for dropping") } - selectedBlocks += blockId - selectedMemory += pair.getValue.size } - if (maxMemory - (currentMemory - selectedMemory) >= space) { - logInfo(selectedBlocks.size + " blocks selected for dropping") - for (blockId <- selectedBlocks) { - val entry = entries.get(blockId) - val data = if (entry.deserialized) { - Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) - } else { - Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) - } - blockManager.dropFromMemory(blockId, data) - } - return true - } else { - return false + logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " + + blocksToDrop.size + " blocks pending") + var i = 0 + while (i < selectedBlocks.size) { + blocksToDrop.add(selectedBlocks(i)) + i += 1 } + selectedBlocks.clear() } - return true - } - - override def contains(blockId: String): Boolean = { - synchronized { entries.containsKey(blockId) } } } diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 8f4b9d395f..d15d7285a7 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") - // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemory("a1", null) - store.dropFromMemory("a2", null) + // Setting storage level of a1 and a2 to invalid; they should be removed from store and master + store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1)) + store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0)) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index be57173db1..15467115b2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -61,8 +61,8 @@ object SparkBuild extends Build { resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/", "JBoss Repository" at "http://repository.jboss.org/nexus/content/repositories/releases/", - "Spray Repository" at "http://repo.spray.cc/", - "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/" + "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/", + "Spray Repository" at "http://repo.spray.cc/" ), libraryDependencies ++= Seq( "com.google.guava" % "guava" % "11.0.1", -- cgit v1.2.3