aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrewor14@gmail.com>2014-07-27 16:08:16 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-27 16:08:16 -0700
commitecf30ee7e78ea59c462c54db0fde5328f997466c (patch)
tree38c2819e040322b0bdd3ba54c9164aebe72cc72b
parentf6ff2a61d00d12481bfb211ae13d6992daacdcc2 (diff)
downloadspark-ecf30ee7e78ea59c462c54db0fde5328f997466c.tar.gz
spark-ecf30ee7e78ea59c462c54db0fde5328f997466c.tar.bz2
spark-ecf30ee7e78ea59c462c54db0fde5328f997466c.zip
[SPARK-1777] Prevent OOMs from single partitions
**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large. **Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable. **New configurations.** - `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2) - `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9) For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns. Author: Andrew Or <andrewor14@gmail.com> Closes #1165 from andrewor14/them-rdd-memories and squashes the following commits: e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories c7c8832 [Andrew Or] Simplify logic + update a few comments 269d07b [Andrew Or] Very minor changes to tests 6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b7e165c [Andrew Or] Add new tests for unrolling blocks f12916d [Andrew Or] Slightly clean up tests 71672a7 [Andrew Or] Update unrollSafely tests 369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior f4d035c [Andrew Or] Allow one thread to unroll multiple blocks a66fbd2 [Andrew Or] Rename a few things + update comments 68730b3 [Andrew Or] Fix weird scalatest behavior e40c60d [Andrew Or] Fix MIMA excludes ff77aa1 [Andrew Or] Fix tests 1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap ed6cda4 [Andrew Or] Formatting fix (super minor) f9ff82e [Andrew Or] putValues -> putIterator + putArray beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8448c9b [Andrew Or] Fix tests a49ba4d [Andrew Or] Do not expose unroll memory check period 69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap 3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8288228 [Andrew Or] Synchronize put and unroll properly 4f18a3d [Andrew Or] bufferFraction -> unrollFraction 28edfa3 [Andrew Or] Update a few comments / log messages 728323b [Andrew Or] Do not synchronize every 1000 elements 5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 129c441 [Andrew Or] Fix bug: Use toArray rather than array 9a65245 [Andrew Or] Update a few comments + minor control flow changes 57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case 3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes) f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 0871835 [Andrew Or] Add an effective storage level interface to BlockManager 64e7d4c [Andrew Or] Add/modify a few comments (minor) 8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 4f4834e [Andrew Or] Use original storage level for blocks dropped to disk ecc8c2d [Andrew Or] Fix binary incompatibility 24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk 2b7ee66 [Andrew Or] Fix bug in SizeTracking* 9b9a273 [Andrew Or] Fix tests 20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 649bdb3 [Andrew Or] Document spark.storage.bufferFraction a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap 198e374 [Andrew Or] Unfold -> unroll 0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d9d02a8 [Andrew Or] Remove unused param in unfoldSafely ec728d8 [Andrew Or] Add tests for safe unfolding of blocks 22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator 0871535 [Andrew Or] Fix tests in BlockManagerSuite d68f31e [Andrew Or] Safely unfold blocks for all memory puts 5961f50 [Andrew Or] Fix tests 195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore 1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d5dd3b4 [Andrew Or] Free buffer memory in finally ea02eec [Andrew Or] Fix tests b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 87aa75c [Andrew Or] Fix mima excludes again (typo) 11eb921 [Andrew Or] Clarify comment (minor) 50cae44 [Andrew Or] Remove now duplicate mima exclude 7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories df47265 [Andrew Or] Fix binary incompatibility 6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories f94f5af [Andrew Or] Update a few comments (minor) 776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array 97ea499 [Andrew Or] Change BlockManager interface to use Arrays c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala72
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala110
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala256
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala594
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala120
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala204
-rw-r--r--docs/configuration.md9
-rw-r--r--project/MimaExcludes.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala5
21 files changed, 1165 insertions, 517 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8f867686a0..5ddda4d695 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -17,9 +17,9 @@
package org.apache.spark
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -30,7 +30,7 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
/** Keys of RDD partitions that are being computed/loaded. */
- private val loading = new HashSet[RDDBlockId]()
+ private val loading = new mutable.HashSet[RDDBlockId]
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
@@ -118,21 +118,29 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
/**
- * Cache the values of a partition, keeping track of any updates in the storage statuses
- * of other blocks along the way.
+ * Cache the values of a partition, keeping track of any updates in the storage statuses of
+ * other blocks along the way.
+ *
+ * The effective storage level refers to the level that actually specifies BlockManager put
+ * behavior, not the level originally specified by the user. This is mainly for forcing a
+ * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
+ * while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
- storageLevel: StorageLevel,
- updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
-
- if (!storageLevel.useMemory) {
- /* This RDD is not to be cached in memory, so we can just pass the computed values
- * as an iterator directly to the BlockManager, rather than first fully unrolling
- * it in memory. The latter option potentially uses much more memory and risks OOM
- * exceptions that can be avoided. */
- updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+ level: StorageLevel,
+ updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
+ effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
+
+ val putLevel = effectiveStorageLevel.getOrElse(level)
+ if (!putLevel.useMemory) {
+ /*
+ * This RDD is not to be cached in memory, so we can just pass the computed values as an
+ * iterator directly to the BlockManager rather than first fully unrolling it in memory.
+ */
+ updatedBlocks ++=
+ blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
@@ -140,14 +148,36 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+ /*
+ * This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
- * we may end up dropping a partition from memory store before getting it back, e.g.
- * when the entirety of the RDD does not fit in memory. */
- val elements = new ArrayBuffer[Any]
- elements ++= values
- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
+ * we may end up dropping a partition from memory store before getting it back.
+ *
+ * In addition, we must be careful to not unroll the entire partition in memory at once.
+ * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
+ * single partition. Instead, we unroll the values cautiously, potentially aborting and
+ * dropping the partition to disk if applicable.
+ */
+ blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+ case Left(arr) =>
+ // We have successfully unrolled the entire partition, so cache it in memory
+ updatedBlocks ++=
+ blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+ arr.iterator.asInstanceOf[Iterator[T]]
+ case Right(it) =>
+ // There is not enough space to cache this partition in memory
+ logWarning(s"Not enough space to cache partition $key in memory! " +
+ s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
+ val returnValues = it.asInstanceOf[Iterator[T]]
+ if (putLevel.useDisk) {
+ logWarning(s"Persisting partition $key to disk instead.")
+ val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
+ useOffHeap = false, deserialized = false, putLevel.replication)
+ putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+ } else {
+ returnValues
+ }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8f70744d80..6ee731b22c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -67,7 +67,7 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {
- // A mapping of thread ID to amount of memory used for shuffle in bytes
+ // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b16133b20c..3b69bc4ca4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -266,11 +266,13 @@ private[spark] class Executor(
}
}
} finally {
- // TODO: Unregister shuffle memory only for ResultTask
+ // Release memory used by this thread for shuffles
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
+ // Release memory used by this thread for unrolling blocks
+ env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0db0a5bc73..d746526639 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
@@ -71,9 +71,9 @@ private[spark] class BlockManager(
// Actual storage of where blocks are kept
private var tachyonInitialized = false
- private[storage] val memoryStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore = new DiskStore(this, diskBlockManager)
- private[storage] lazy val tachyonStore: TachyonStore = {
+ private[spark] val memoryStore = new MemoryStore(this, maxMemory)
+ private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+ private[spark] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
@@ -463,16 +463,17 @@ private[spark] class BlockManager(
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
- // TODO: Consider creating a putValues that also takes in a iterator?
- val valuesBuffer = new ArrayBuffer[Any]
- valuesBuffer ++= values
- memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
- match {
- case Left(values2) =>
- return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
- case _ =>
- throw new SparkException("Memory store did not return back an iterator")
- }
+ val putResult = memoryStore.putIterator(
+ blockId, values, level, returnValues = true, allowPersistToDisk = false)
+ // The put may or may not have succeeded, depending on whether there was enough
+ // space to unroll the block. Either way, the put here should return an iterator.
+ putResult.data match {
+ case Left(it) =>
+ return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+ case _ =>
+ // This only happens if we dropped the values back to disk (which is never)
+ throw new SparkException("Memory store did not return an iterator!")
+ }
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
@@ -561,13 +562,14 @@ private[spark] class BlockManager(
iter
}
- def put(
+ def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
- doPut(blockId, IteratorValues(values), level, tellMaster)
+ doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
@@ -589,13 +591,14 @@ private[spark] class BlockManager(
* Put a new block of values to the block manager.
* Return a list of blocks updated as a result of this put.
*/
- def put(
+ def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
- doPut(blockId, ArrayBufferValues(values), level, tellMaster)
+ doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
@@ -606,19 +609,33 @@ private[spark] class BlockManager(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
- doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
+ doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
+ /**
+ * Put the given block according to the given level in one of the block stores, replicating
+ * the values if necessary.
+ *
+ * The effective storage level refers to the level according to which the block will actually be
+ * handled. This allows the caller to specify an alternate behavior of doPut while preserving
+ * the original level specified by the user.
+ */
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None)
+ : Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
+ effectiveStorageLevel.foreach { level =>
+ require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
+ }
// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -657,13 +674,16 @@ private[spark] class BlockManager(
// Size of the block in bytes
var size = 0L
+ // The level we actually use to put the block
+ val putLevel = effectiveStorageLevel.getOrElse(level)
+
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
- case b: ByteBufferValues if level.replication > 1 =>
+ case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
- Future { replicate(blockId, bufferView, level) }
+ Future { replicate(blockId, bufferView, putLevel) }
case _ => null
}
@@ -676,18 +696,18 @@ private[spark] class BlockManager(
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
- if (level.useMemory) {
+ if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
- } else if (level.useOffHeap) {
+ } else if (putLevel.useOffHeap) {
// Use tachyon for off-heap storage
(false, tachyonStore)
- } else if (level.useDisk) {
+ } else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
- (level.replication > 1, diskStore)
+ (putLevel.replication > 1, diskStore)
} else {
- assert(level == StorageLevel.NONE)
+ assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
@@ -696,22 +716,22 @@ private[spark] class BlockManager(
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
- blockStore.putValues(blockId, iterator, level, returnValues)
- case ArrayBufferValues(array) =>
- blockStore.putValues(blockId, array, level, returnValues)
+ blockStore.putIterator(blockId, iterator, putLevel, returnValues)
+ case ArrayValues(array) =>
+ blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
- blockStore.putBytes(blockId, bytes, level)
+ blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
- case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+ case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
// Keep track of which blocks are dropped from memory
- if (level.useMemory) {
+ if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
@@ -742,7 +762,7 @@ private[spark] class BlockManager(
// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
- if (level.replication > 1) {
+ if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
@@ -758,7 +778,7 @@ private[spark] class BlockManager(
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
- replicate(blockId, bytesAfterPut, level)
+ replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
@@ -766,7 +786,7 @@ private[spark] class BlockManager(
BlockManager.dispose(bytesAfterPut)
- if (level.replication > 1) {
+ if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
@@ -818,7 +838,7 @@ private[spark] class BlockManager(
value: Any,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
- put(blockId, Iterator(value), level, tellMaster)
+ putIterator(blockId, Iterator(value), level, tellMaster)
}
/**
@@ -829,7 +849,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
- data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+ data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
@@ -853,7 +873,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
- diskStore.putValues(blockId, elements, level, returnValues = false)
+ diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
private val ID_GENERATOR = new IdGenerator
+ /** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
- (Runtime.getRuntime.maxMemory * memoryFraction).toLong
+ val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
+ (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
def getHeartBeatFrequency(conf: SparkConf): Long =
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index b9b53b1a2f..69985c9759 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -37,15 +37,15 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(
+ def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
- def putValues(
+ def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index ebff0cb5ba..c83261dd91 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
-private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
+private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {
val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
@@ -57,15 +55,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putValues(blockId, values.toIterator, level, returnValues)
+ putIterator(blockId, values.toIterator, level, returnValues)
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 71f66c826c..28f675c2bb 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -20,27 +20,45 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
import java.util.LinkedHashMap
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.collection.SizeTrackingVector
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
/**
- * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
-private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
+ private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+
@volatile private var currentMemory = 0L
- // Object used to ensure that only one thread is putting blocks and if necessary, dropping
- // blocks from the memory store.
- private val putLock = new Object()
+
+ // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
+ private val accountingLock = new Object
+
+ // A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
+ // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+ private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+
+ /**
+ * The amount of space ensured for unrolling values in memory, shared across all cores.
+ * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
+ */
+ private val maxUnrollMemory: Long = {
+ val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2)
+ (maxMemory * unrollFraction).toLong
+ }
logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
+ /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: BlockId): Long = {
@@ -55,20 +73,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
+ putIterator(blockId, values, level, returnValues = true)
} else {
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
if (level.deserialized) {
@@ -82,14 +96,52 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- val valueEntries = new ArrayBuffer[Any]()
- valueEntries ++= values
- putValues(blockId, valueEntries, level, returnValues)
+ putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+ }
+
+ /**
+ * Attempt to put the given block in memory store.
+ *
+ * There may not be enough space to fully unroll the iterator in memory, in which case we
+ * optionally drop the values to disk if
+ * (1) the block's storage level specifies useDisk, and
+ * (2) `allowPersistToDisk` is true.
+ *
+ * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
+ * back from disk and attempts to cache it in memory. In this case, we should not persist the
+ * block back on disk again, as it is already in disk store.
+ */
+ private[storage] def putIterator(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean,
+ allowPersistToDisk: Boolean): PutResult = {
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+ unrolledValues match {
+ case Left(arrayValues) =>
+ // Values are fully unrolled in memory, so store them as an array
+ val res = putArray(blockId, arrayValues, level, returnValues)
+ droppedBlocks ++= res.droppedBlocks
+ PutResult(res.size, res.data, droppedBlocks)
+ case Right(iteratorValues) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ logWarning(s"Not enough space to store block $blockId in memory! " +
+ s"Free memory is $freeMemory bytes.")
+ if (level.useDisk && allowPersistToDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
+ PutResult(res.size, res.data, droppedBlocks)
+ } else {
+ PutResult(0, Left(iteratorValues), droppedBlocks)
+ }
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -99,7 +151,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+ Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
@@ -112,7 +164,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
@@ -141,6 +193,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
/**
+ * Unroll the given block in memory safely.
+ *
+ * The safety of this operation refers to avoiding potential OOM exceptions caused by
+ * unrolling the entirety of the block in memory at once. This is achieved by periodically
+ * checking whether the memory restrictions for unrolling blocks are still satisfied,
+ * stopping immediately if not. This check is a safeguard against the scenario in which
+ * there is not enough free memory to accommodate the entirety of a single block.
+ *
+ * This method returns either an array with the contents of the entire block or an iterator
+ * containing the values of the block (if the array would have exceeded available memory).
+ */
+ def unrollSafely(
+ blockId: BlockId,
+ values: Iterator[Any],
+ droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+ : Either[Array[Any], Iterator[Any]] = {
+
+ // Number of elements unrolled so far
+ var elementsUnrolled = 0
+ // Whether there is still enough memory for us to continue unrolling this block
+ var keepUnrolling = true
+ // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
+ val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+ // How often to check whether we need to request more memory
+ val memoryCheckPeriod = 16
+ // Memory currently reserved by this thread for this particular unrolling operation
+ var memoryThreshold = initialMemoryThreshold
+ // Memory to request as a multiple of current vector size
+ val memoryGrowthFactor = 1.5
+ // Previous unroll memory held by this thread, for releasing later (only at the very end)
+ val previousMemoryReserved = currentUnrollMemoryForThisThread
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[Any]
+
+ // Request enough memory to begin unrolling
+ keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
+
+ // Unroll this block safely, checking whether we have exceeded our threshold periodically
+ try {
+ while (values.hasNext && keepUnrolling) {
+ vector += values.next()
+ if (elementsUnrolled % memoryCheckPeriod == 0) {
+ // If our vector's size has exceeded the threshold, request more memory
+ val currentSize = vector.estimateSize()
+ if (currentSize >= memoryThreshold) {
+ val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
+ // Hold the accounting lock, in case another thread concurrently puts a block that
+ // takes up the unrolling space we just ensured here
+ accountingLock.synchronized {
+ if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
+ // If the first request is not granted, try again after ensuring free space
+ // If there is still not enough space, give up and drop the partition
+ val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
+ if (spaceToEnsure > 0) {
+ val result = ensureFreeSpace(blockId, spaceToEnsure)
+ droppedBlocks ++= result.droppedBlocks
+ }
+ keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
+ }
+ }
+ // New threshold is currentSize * memoryGrowthFactor
+ memoryThreshold = currentSize + amountToRequest
+ }
+ }
+ elementsUnrolled += 1
+ }
+
+ if (keepUnrolling) {
+ // We successfully unrolled the entirety of this block
+ Left(vector.toArray)
+ } else {
+ // We ran out of space while unrolling the values for this block
+ Right(vector.iterator ++ values)
+ }
+
+ } finally {
+ // If we return an array, the values returned do not depend on the underlying vector and
+ // we can immediately free up space for other threads. Otherwise, if we return an iterator,
+ // we release the memory claimed by this thread later on when the task finishes.
+ if (keepUnrolling) {
+ val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+ releaseUnrollMemoryForThisThread(amountToRelease)
+ }
+ }
+ }
+
+ /**
* Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
*/
private def getRddId(blockId: BlockId): Option[Int] = {
@@ -149,10 +288,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
- * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
- * size must also be passed by the caller.
+ * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
+ * must also be passed by the caller.
*
- * Lock on the object putLock to ensure that all the put requests and its associated block
+ * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
@@ -174,7 +313,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- putLock.synchronized {
+ accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks
@@ -193,7 +332,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]])
+ Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -210,12 +349,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
- * Otherwise, the freed space may fill up before the caller puts in their new value.
+ * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
+ * blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
- private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
+ private def ensureFreeSpace(
+ blockIdToAdd: BlockId,
+ space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -225,9 +366,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
- if (maxMemory - currentMemory < space) {
+ // Take into account the amount of memory currently occupied by unrolling blocks
+ val actualFreeMemory = freeMemory - currentUnrollMemory
+
+ if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
- val selectedBlocks = new ArrayBuffer[BlockId]()
+ val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
// This is synchronized to ensure that the set of entries is not changed
@@ -235,7 +379,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
@@ -245,7 +389,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- if (maxMemory - (currentMemory - selectedMemory) >= space) {
+ if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
@@ -254,7 +398,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -275,8 +419,56 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}
+
+ /**
+ * Reserve additional memory for unrolling blocks used by this thread.
+ * Return whether the request is granted.
+ */
+ private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+ accountingLock.synchronized {
+ val granted = freeMemory > currentUnrollMemory + memory
+ if (granted) {
+ val threadId = Thread.currentThread().getId
+ unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
+ }
+ granted
+ }
+ }
+
+ /**
+ * Release memory used by this thread for unrolling blocks.
+ * If the amount is not specified, remove the current thread's allocation altogether.
+ */
+ private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ if (memory < 0) {
+ unrollMemoryMap.remove(threadId)
+ } else {
+ unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
+ // If this thread claims no more unroll memory, release it completely
+ if (unrollMemoryMap(threadId) <= 0) {
+ unrollMemoryMap.remove(threadId)
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the amount of memory currently occupied for unrolling blocks across all threads.
+ */
+ private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+ unrollMemoryMap.values.sum
+ }
+
+ /**
+ * Return the amount of memory currently occupied for unrolling blocks by this thread.
+ */
+ private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+ unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
+ }
}
-private case class ResultWithDroppedBlocks(
+private[spark] case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index d8ff4ff6bd..932b561604 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,8 +20,6 @@ package org.apache.spark.storage
import java.io.IOException
import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-
import tachyon.client.{ReadType, WriteType}
import org.apache.spark.Logging
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on Tachyon.
*/
-private class TachyonStore(
+private[spark] class TachyonStore(
blockManager: BlockManager,
tachyonManager: TachyonBlockManager)
extends BlockStore(blockManager: BlockManager) with Logging {
@@ -45,15 +43,15 @@ private class TachyonStore(
putIntoTachyonStore(blockId, bytes, returnValues = true)
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putValues(blockId, values.toIterator, level, returnValues)
+ putIterator(blockId, values.toIterator, level, returnValues)
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 328be158db..75c2e09a6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
val block = (1 to blockSize).map(_ => Random.nextInt())
val level = randomLevel()
val startTime = System.currentTimeMillis()
- manager.put(blockId, block.iterator, level, tellMaster = true)
+ manager.putIterator(blockId, block.iterator, level, tellMaster = true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
queue.add((blockId, block))
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 0846557530..bce3b3afe9 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
}
}
- // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+ // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index b84eb65c62..7e76d060d6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
_array(index)
}
- def +=(value: V) {
+ def +=(value: V): Unit = {
if (_numElements == _array.length) {
resize(_array.length * 2)
}
@@ -50,6 +50,19 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
def size: Int = _numElements
+ def iterator: Iterator[V] = new Iterator[V] {
+ var index = 0
+ override def hasNext: Boolean = index < _numElements
+ override def next(): V = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+ val value = _array(index)
+ index += 1
+ value
+ }
+ }
+
/** Gets the underlying array backing this vector. */
def array: Array[V] = _array
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
new file mode 100644
index 0000000000..3eb1010dc1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * A general interface for collections to keep track of their estimated sizes in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).
+ */
+private[spark] trait SizeTracker {
+
+ import SizeTracker._
+
+ /**
+ * Controls the base of the exponential which governs the rate of sampling.
+ * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+ */
+ private val SAMPLE_GROWTH_RATE = 1.1
+
+ /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */
+ private val samples = new mutable.Queue[Sample]
+
+ /** The average number of bytes per update between our last two samples. */
+ private var bytesPerUpdate: Double = _
+
+ /** Total number of insertions and updates into the map since the last resetSamples(). */
+ private var numUpdates: Long = _
+
+ /** The value of 'numUpdates' at which we will take our next sample. */
+ private var nextSampleNum: Long = _
+
+ resetSamples()
+
+ /**
+ * Reset samples collected so far.
+ * This should be called after the collection undergoes a dramatic change in size.
+ */
+ protected def resetSamples(): Unit = {
+ numUpdates = 1
+ nextSampleNum = 1
+ samples.clear()
+ takeSample()
+ }
+
+ /**
+ * Callback to be invoked after every update.
+ */
+ protected def afterUpdate(): Unit = {
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) {
+ takeSample()
+ }
+ }
+
+ /**
+ * Take a new sample of the current collection's size.
+ */
+ private def takeSample(): Unit = {
+ samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
+ // Only use the last two samples to extrapolate
+ if (samples.size > 2) {
+ samples.dequeue()
+ }
+ val bytesDelta = samples.toList.reverse match {
+ case latest :: previous :: tail =>
+ (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+ // If fewer than 2 samples, assume no change
+ case _ => 0
+ }
+ bytesPerUpdate = math.max(0, bytesDelta)
+ nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+ }
+
+ /**
+ * Estimate the current size of the collection in bytes. O(1) time.
+ */
+ def estimateSize(): Long = {
+ assert(samples.nonEmpty)
+ val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+ (samples.last.size + extrapolatedDelta).toLong
+ }
+}
+
+private object SizeTracker {
+ case class Sample(size: Long, numUpdates: Long)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index 204330dad4..de61e1d17f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,85 +17,24 @@
package org.apache.spark.util.collection
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.util.SizeEstimator
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
-
/**
- * Append-only map that keeps track of its estimated size in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ * An append-only map that keeps track of its estimated size in bytes.
*/
-private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
-
- /**
- * Controls the base of the exponential which governs the rate of sampling.
- * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
- */
- private val SAMPLE_GROWTH_RATE = 1.1
-
- /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
- private val samples = new ArrayBuffer[Sample]()
-
- /** Total number of insertions and updates into the map since the last resetSamples(). */
- private var numUpdates: Long = _
-
- /** The value of 'numUpdates' at which we will take our next sample. */
- private var nextSampleNum: Long = _
-
- /** The average number of bytes per update between our last two samples. */
- private var bytesPerUpdate: Double = _
-
- resetSamples()
-
- /** Called after the map grows in size, as this can be a dramatic change for small objects. */
- def resetSamples() {
- numUpdates = 1
- nextSampleNum = 1
- samples.clear()
- takeSample()
- }
+private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
override def update(key: K, value: V): Unit = {
super.update(key, value)
- numUpdates += 1
- if (nextSampleNum == numUpdates) { takeSample() }
+ super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
- numUpdates += 1
- if (nextSampleNum == numUpdates) { takeSample() }
+ super.afterUpdate()
newValue
}
- /** Takes a new sample of the current map's size. */
- def takeSample() {
- samples += Sample(SizeEstimator.estimate(this), numUpdates)
- // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
- bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
- case latest :: previous :: tail =>
- (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
- case _ =>
- 0
- })
- nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
- }
-
- override protected def growTable() {
+ override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
-
- /** Estimates the current size of the map in bytes. O(1) time. */
- def estimateSize(): Long = {
- assert(samples.nonEmpty)
- val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
- (samples.last.size + extrapolatedDelta).toLong
- }
-}
-
-private object SizeTrackingAppendOnlyMap {
- case class Sample(size: Long, numUpdates: Long)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
new file mode 100644
index 0000000000..65a7b4e0d4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An append-only buffer that keeps track of its estimated size in bytes.
+ */
+private[spark] class SizeTrackingVector[T: ClassTag]
+ extends PrimitiveVector[T]
+ with SizeTracker {
+
+ override def +=(value: T): Unit = {
+ super.+=(value)
+ super.afterUpdate()
+ }
+
+ override def resize(newLength: Int): PrimitiveVector[T] = {
+ super.resize(newLength)
+ resetSamples()
+ this
+ }
+
+ /**
+ * Return a trimmed version of the underlying array.
+ */
+ def toArray: Array[T] = {
+ super.iterator.toArray
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 7f5d0b061e..9c5f394d38 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark
-import scala.collection.mutable.ArrayBuffer
-
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
@@ -52,22 +50,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
test("get uncached rdd") {
- expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(None)
- blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
- true).andStubReturn(Seq[(BlockId, BlockStatus)]())
- }
-
- whenExecuting(blockManager) {
- val context = new TaskContext(0, 0, 0)
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(1, 2, 3, 4))
- }
+ // Do not mock this test, because attempting to match Array[Any], which is not covariant,
+ // in blockManager.put is a losing battle. You have been warned.
+ blockManager = sc.env.blockManager
+ cacheManager = sc.env.cacheManager
+ val context = new TaskContext(0, 0, 0)
+ val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
+ assert(computeValue.toList === List(1, 2, 3, 4))
+ assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
+ assert(getValue.get.data.toList === List(1, 2, 3, 4))
}
test("get cached rdd") {
expecting {
- val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 23cb6905bf..dd4fd535d3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -43,6 +43,7 @@ import scala.language.postfixOps
class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
with PrivateMethodTester {
+
private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
@@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
+ private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ new BlockManager(
+ name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
+ }
+
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf,
- securityManager = securityMgr)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
this.actorSystem = actorSystem
- conf.set("spark.driver.port", boundPort.toString)
-
- master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
- conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.storage.unrollFraction", "0.4")
+ conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+ master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf)
+
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("master + 1 manager interaction") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("master + 2 managers interaction") {
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000, "exec1")
+ store2 = makeBlockManager(2000, "exec2")
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("removing block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
@@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
- assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
- assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
+ assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
@@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
- memStatus._1 should equal (2000L)
- memStatus._2 should equal (2000L)
+ memStatus._1 should equal (20000L)
+ memStatus._2 should equal (20000L)
}
}
test("removing rdd") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
@@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("removing broadcast") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val driverStore = store
- val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ val executorStore = makeBlockManager(2000, "executor")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
@@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
@@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("correct BlockResult returned from get() calls") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
- mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list1ForSizeEstimate = new ArrayBuffer[Any]
- list1ForSizeEstimate ++= list1.iterator
- val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
- val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
- val list2ForSizeEstimate = new ArrayBuffer[Any]
- list2ForSizeEstimate ++= list2.iterator
- val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
+ val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
+ val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
@@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
@@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
@@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
@@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(12000)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
- store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
if (tachyonUnitTestEnabled) {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("on-disk storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("LRU with mixed storage levels") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
- val a4 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
+ val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
@@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU with streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list2 = List(new Array[Byte](200), new Array[Byte](200))
- val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
@@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
@@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list2 = List(new Array[Byte](200), new Array[Byte](200))
- val list3 = List(new Array[Byte](200), new Array[Byte](200))
- val list4 = List(new Array[Byte](200), new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
val listSize = SizeEstimator.estimate(listForSizeEstimate)
@@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.get("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+ store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
@@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("overly large block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf,
- securityMgr, mapOutputTracker)
- store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(5000)
+ store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
- store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
}
@@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("block compression") {
try {
conf.set("spark.shuffle.compress", "true")
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(20000, "exec1")
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
@@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store = null
conf.set("spark.shuffle.compress", "false")
- store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+ store = makeBlockManager(20000, "exec2")
+ store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
store.stop()
store = null
conf.set("spark.broadcast.compress", "true")
- store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+ store = makeBlockManager(20000, "exec3")
+ store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
store.stop()
store = null
conf.set("spark.broadcast.compress", "false")
- store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
+ store = makeBlockManager(20000, "exec4")
+ store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "true")
- store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
+ store = makeBlockManager(20000, "exec5")
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
- store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
+ store = makeBlockManager(20000, "exec6")
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
- assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store = makeBlockManager(20000, "exec7")
+ store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
} finally {
@@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(Arrays.equals(mappedAsArray, bytes))
assert(Arrays.equals(notMappedAsArray, bytes))
}
-
+
test("updated block statuses") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](200))
- val bigList = List.fill(8)(new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](2000))
+ val bigList = List.fill(8)(new Array[Byte](2000))
// 1 updated block (i.e. list1)
val updatedBlocks1 =
- store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 1 updated block (i.e. list2)
val updatedBlocks2 =
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 =
- store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
id match {
@@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
case _ => fail("Updated block is neither list1 nor list3")
}
}
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 =
- store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
id match {
@@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
case _ => fail("Updated block is neither list2 nor list4")
}
}
- assert(store.get("list4").isDefined, "list4 was not in store")
+ assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+ assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
- // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+ // No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 =
- store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks5.size === 0)
- assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list4").isDefined, "list4 was not in store")
- assert(!store.get("list5").isDefined, "list5 was in store")
+
+ // memory store contains only list3 and list4
+ assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
+ assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
+ assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
+ assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
+ assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
+
+ // disk store contains only list2
+ assert(!store.diskStore.contains("list1"), "list1 was in disk store")
+ assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+ assert(!store.diskStore.contains("list3"), "list3 was in disk store")
+ assert(!store.diskStore.contains("list4"), "list4 was in disk store")
+ assert(!store.diskStore.contains("list5"), "list5 was in disk store")
}
test("query block statuses") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
- store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getLocations("list1").size === 0)
@@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
- store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
- store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
// getBlockStatus without asking slaves should have the same result
@@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("get matching blocks") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](10))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
- store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
// insert some more blocks
- store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
@@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
- store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
case RDDBlockId(1, _) => true
@@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(12000)
+ store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
- store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}
+
+ test("reserve/release unroll memory") {
+ store = makeBlockManager(12000)
+ val memoryStore = store.memoryStore
+ assert(memoryStore.currentUnrollMemory === 0)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Reserve
+ memoryStore.reserveUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 100)
+ memoryStore.reserveUnrollMemoryForThisThread(200)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 300)
+ memoryStore.reserveUnrollMemoryForThisThread(500)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 800)
+ memoryStore.reserveUnrollMemoryForThisThread(1000000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+ // Release
+ memoryStore.releaseUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 700)
+ memoryStore.releaseUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+ // Reserve again
+ memoryStore.reserveUnrollMemoryForThisThread(4400)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
+ memoryStore.reserveUnrollMemoryForThisThread(20000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
+ // Release again
+ memoryStore.releaseUnrollMemoryForThisThread(1000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
+ memoryStore.releaseUnrollMemoryForThisThread() // release all
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ }
+
+ /**
+ * Verify the result of MemoryStore#unrollSafely is as expected.
+ */
+ private def verifyUnroll(
+ expected: Iterator[Any],
+ result: Either[Array[Any], Iterator[Any]],
+ shouldBeArray: Boolean): Unit = {
+ val actual: Iterator[Any] = result match {
+ case Left(arr: Array[Any]) =>
+ assert(shouldBeArray, "expected iterator from unroll!")
+ arr.iterator
+ case Right(it: Iterator[Any]) =>
+ assert(!shouldBeArray, "expected array from unroll!")
+ it
+ case _ =>
+ fail("unroll returned neither an iterator nor an array...")
+ }
+ expected.zip(actual).foreach { case (e, a) =>
+ assert(e === a, "unroll did not return original values!")
+ }
+ }
+
+ test("safely unroll blocks") {
+ store = makeBlockManager(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ val memoryStore = store.memoryStore
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with all the space in the world. This should succeed and return an array.
+ var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with not enough space. This should succeed after kicking out someBlock1.
+ store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ assert(droppedBlocks.size === 1)
+ assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
+ droppedBlocks.clear()
+
+ // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
+ // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
+ // In the mean time, however, we kicked out someBlock2 before giving up.
+ store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+ verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ assert(droppedBlocks.size === 1)
+ assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
+ droppedBlocks.clear()
+ }
+
+ test("safely unroll blocks through putIterator") {
+ store = makeBlockManager(12000)
+ val memOnly = StorageLevel.MEMORY_ONLY
+ val memoryStore = store.memoryStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.size > 0) // unroll was successful
+ assert(result2.size > 0)
+ assert(result1.data.isLeft) // unroll did not drop this block to disk
+ assert(result2.data.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ memoryStore.remove("b1")
+ memoryStore.remove("b2")
+ store.putIterator("b1", smallIterator, memOnly)
+ store.putIterator("b2", smallIterator, memOnly)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ assert(result3.size > 0)
+ assert(result3.data.isLeft)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.remove("b3")
+ store.putIterator("b3", smallIterator, memOnly)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
+ assert(result4.size === 0) // unroll was unsuccessful
+ assert(result4.data.isLeft)
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ }
+
+ /**
+ * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK.
+ */
+ test("safely unroll blocks through putIterator (disk)") {
+ store = makeBlockManager(12000)
+ val memAndDisk = StorageLevel.MEMORY_AND_DISK
+ val memoryStore = store.memoryStore
+ val diskStore = store.diskStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ store.putIterator("b1", smallIterator, memAndDisk)
+ store.putIterator("b2", smallIterator, memAndDisk)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ // Memory store should contain b2 and b3, while disk store should contain only b1
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
+ assert(result3.size > 0)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(diskStore.contains("b1"))
+ assert(!diskStore.contains("b2"))
+ assert(!diskStore.contains("b3"))
+ memoryStore.remove("b3")
+ store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll huge block with not enough space. This should fail and drop the new block to disk
+ // directly in addition to kicking out b2 in the process. Memory store should contain only
+ // b3, while disk store should contain b1, b2 and b4.
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
+ assert(result4.size > 0)
+ assert(result4.data.isRight) // unroll returned bytes from disk
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(diskStore.contains("b1"))
+ assert(diskStore.contains("b2"))
+ assert(!diskStore.contains("b3"))
+ assert(diskStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ }
+
+ test("multiple unrolls by the same thread") {
+ store = makeBlockManager(12000)
+ val memOnly = StorageLevel.MEMORY_ONLY
+ val memoryStore = store.memoryStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // All unroll memory used is released because unrollSafely returned an array
+ memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll memory is not released because unrollSafely returned an iterator
+ // that still depends on the underlying vector used in the process
+ memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB3 > 0)
+
+ // The unroll memory owned by this thread builds on top of its value after the previous unrolls
+ memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+ // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
+ memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+ memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+ memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
deleted file mode 100644
index 93f0c6a8e6..0000000000
--- a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-
-class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
- val NORMAL_ERROR = 0.20
- val HIGH_ERROR = 0.30
-
- test("fixed size insertions") {
- testWith[Int, Long](10000, i => (i, i.toLong))
- testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
- testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
- }
-
- test("variable size insertions") {
- val rand = new Random(123456789)
- def randString(minLen: Int, maxLen: Int): String = {
- "a" * (rand.nextInt(maxLen - minLen) + minLen)
- }
- testWith[Int, String](10000, i => (i, randString(0, 10)))
- testWith[Int, String](10000, i => (i, randString(0, 100)))
- testWith[Int, String](10000, i => (i, randString(90, 100)))
- }
-
- test("updates") {
- val rand = new Random(123456789)
- def randString(minLen: Int, maxLen: Int): String = {
- "a" * (rand.nextInt(maxLen - minLen) + minLen)
- }
- testWith[String, Int](10000, i => (randString(0, 10000), i))
- }
-
- def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
- val map = new SizeTrackingAppendOnlyMap[K, V]()
- for (i <- 0 until numElements) {
- val (k, v) = makeElement(i)
- map(k) = v
- expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
- }
- }
-
- def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
- val betterEstimatedSize = SizeEstimator.estimate(obj)
- assert(betterEstimatedSize * (1 - error) < estimatedSize,
- s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
- assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
- s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
- }
-}
-
-object SizeTrackingAppendOnlyMapSuite {
- // Speed test, for reproducibility of results.
- // These could be highly non-deterministic in general, however.
- // Results:
- // AppendOnlyMap: 31 ms
- // SizeTracker: 54 ms
- // SizeEstimator: 1500 ms
- def main(args: Array[String]) {
- val numElements = 100000
-
- val baseTimes = for (i <- 0 until 10) yield time {
- val map = new AppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- }
- }
-
- val sampledTimes = for (i <- 0 until 10) yield time {
- val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- map.estimateSize()
- }
- }
-
- val unsampledTimes = for (i <- 0 until 3) yield time {
- val map = new AppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- SizeEstimator.estimate(map)
- }
- }
-
- println("Base: " + baseTimes)
- println("SizeTracker (sampled): " + sampledTimes)
- println("SizeEstimator (unsampled): " + unsampledTimes)
- }
-
- def time(f: => Unit): Long = {
- val start = System.currentTimeMillis()
- f
- System.currentTimeMillis() - start
- }
-
- private class LargeDummyClass {
- val arr = new Array[Int](100)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
new file mode 100644
index 0000000000..1f33967249
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class SizeTrackerSuite extends FunSuite {
+ val NORMAL_ERROR = 0.20
+ val HIGH_ERROR = 0.30
+
+ import SizeTrackerSuite._
+
+ test("vector fixed size insertions") {
+ testVector[Long](10000, i => i.toLong)
+ testVector[(Long, Long)](10000, i => (i.toLong, i.toLong))
+ testVector[LargeDummyClass](10000, i => new LargeDummyClass)
+ }
+
+ test("vector variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testVector[String](10000, i => randString(0, 10))
+ testVector[String](10000, i => randString(0, 100))
+ testVector[String](10000, i => randString(90, 100))
+ }
+
+ test("map fixed size insertions") {
+ testMap[Int, Long](10000, i => (i, i.toLong))
+ testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+ testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass))
+ }
+
+ test("map variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testMap[Int, String](10000, i => (i, randString(0, 10)))
+ testMap[Int, String](10000, i => (i, randString(0, 100)))
+ testMap[Int, String](10000, i => (i, randString(90, 100)))
+ }
+
+ test("map updates") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testMap[String, Int](10000, i => (randString(0, 10000), i))
+ }
+
+ def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) {
+ val vector = new SizeTrackingVector[T]
+ for (i <- 0 until numElements) {
+ val item = makeElement(i)
+ vector += item
+ expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+ val map = new SizeTrackingAppendOnlyMap[K, V]
+ for (i <- 0 until numElements) {
+ val (k, v) = makeElement(i)
+ map(k) = v
+ expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+ val betterEstimatedSize = SizeEstimator.estimate(obj)
+ assert(betterEstimatedSize * (1 - error) < estimatedSize,
+ s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+ assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+ s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+ }
+}
+
+private object SizeTrackerSuite {
+
+ /**
+ * Run speed tests for size tracking collections.
+ */
+ def main(args: Array[String]): Unit = {
+ if (args.size < 1) {
+ println("Usage: SizeTrackerSuite [num elements]")
+ System.exit(1)
+ }
+ val numElements = args(0).toInt
+ vectorSpeedTest(numElements)
+ mapSpeedTest(numElements)
+ }
+
+ /**
+ * Speed test for SizeTrackingVector.
+ *
+ * Results for 100000 elements (possibly non-deterministic):
+ * PrimitiveVector 15 ms
+ * SizeTracker 51 ms
+ * SizeEstimator 2000 ms
+ */
+ def vectorSpeedTest(numElements: Int): Unit = {
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val vector = new PrimitiveVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ }
+ }
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val vector = new SizeTrackingVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ vector.estimateSize()
+ }
+ }
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val vector = new PrimitiveVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ SizeEstimator.estimate(vector)
+ }
+ }
+ printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, unsampledTimes)
+ }
+
+ /**
+ * Speed test for SizeTrackingAppendOnlyMap.
+ *
+ * Results for 100000 elements (possibly non-deterministic):
+ * AppendOnlyMap 30 ms
+ * SizeTracker 41 ms
+ * SizeEstimator 1666 ms
+ */
+ def mapSpeedTest(numElements: Int): Unit = {
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ }
+ }
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ map.estimateSize()
+ }
+ }
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ SizeEstimator.estimate(map)
+ }
+ }
+ printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, unsampledTimes)
+ }
+
+ def printSpeedTestResult(
+ testName: String,
+ baseTimes: Seq[Long],
+ sampledTimes: Seq[Long],
+ unsampledTimes: Seq[Long]): Unit = {
+ println(s"Average times for $testName (ms):")
+ println(" Base - " + averageTime(baseTimes))
+ println(" SizeTracker (sampled) - " + averageTime(sampledTimes))
+ println(" SizeEstimator (unsampled) - " + averageTime(unsampledTimes))
+ println()
+ }
+
+ def time(f: => Unit): Long = {
+ val start = System.currentTimeMillis()
+ f
+ System.currentTimeMillis() - start
+ }
+
+ def averageTime(v: Seq[Long]): Long = {
+ v.sum / v.size
+ }
+
+ private class LargeDummyClass {
+ val arr = new Array[Int](100)
+ }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 46e3dd914b..2e6c85cc2b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -481,6 +481,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.storage.unrollFraction</code></td>
+ <td>0.2</td>
+ <td>
+ Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+ This is dynamically allocated by dropping existing blocks when there is not enough free
+ storage space to unroll the new block in its entirety.
+ </td>
+</tr>
+<tr>
<td><code>spark.tachyonStore.baseDir</code></td>
<td>System.getProperty("java.io.tmpdir")</td>
<td>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e9220db6b1..5ff88f0dd1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
-
def excludes(version: String) =
version match {
case v if v.startsWith("1.1") =>
@@ -63,6 +62,15 @@ object MimaExcludes {
"org.apache.spark.storage.MemoryStore.Entry")
) ++
Seq(
+ // Renamed putValues -> putArray + putIterator
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.DiskStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.TachyonStore.putValues")
+ ) ++
+ Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
) ++
Seq( // Ignore some private methods in ALS.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index ce8316bb14..d934b9cbfc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
- blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
- storageLevel, tellMaster = true)
+ blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
@@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
- blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+ blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
}