aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala72
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala110
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockStore.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/DiskStore.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala256
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonStore.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/SizeEstimator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/CacheManagerSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala594
-rw-r--r--core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala120
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala204
-rw-r--r--docs/configuration.md9
-rw-r--r--project/MimaExcludes.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala5
21 files changed, 1165 insertions, 517 deletions
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 8f867686a0..5ddda4d695 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -17,9 +17,9 @@
package org.apache.spark
-import scala.collection.mutable.{ArrayBuffer, HashSet}
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
@@ -30,7 +30,7 @@ import org.apache.spark.storage._
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
/** Keys of RDD partitions that are being computed/loaded. */
- private val loading = new HashSet[RDDBlockId]()
+ private val loading = new mutable.HashSet[RDDBlockId]
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
@@ -118,21 +118,29 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
/**
- * Cache the values of a partition, keeping track of any updates in the storage statuses
- * of other blocks along the way.
+ * Cache the values of a partition, keeping track of any updates in the storage statuses of
+ * other blocks along the way.
+ *
+ * The effective storage level refers to the level that actually specifies BlockManager put
+ * behavior, not the level originally specified by the user. This is mainly for forcing a
+ * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
+ * while preserving the the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
- storageLevel: StorageLevel,
- updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)]): Iterator[T] = {
-
- if (!storageLevel.useMemory) {
- /* This RDD is not to be cached in memory, so we can just pass the computed values
- * as an iterator directly to the BlockManager, rather than first fully unrolling
- * it in memory. The latter option potentially uses much more memory and risks OOM
- * exceptions that can be avoided. */
- updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
+ level: StorageLevel,
+ updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
+ effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
+
+ val putLevel = effectiveStorageLevel.getOrElse(level)
+ if (!putLevel.useMemory) {
+ /*
+ * This RDD is not to be cached in memory, so we can just pass the computed values as an
+ * iterator directly to the BlockManager rather than first fully unrolling it in memory.
+ */
+ updatedBlocks ++=
+ blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
@@ -140,14 +148,36 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
+ /*
+ * This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
- * we may end up dropping a partition from memory store before getting it back, e.g.
- * when the entirety of the RDD does not fit in memory. */
- val elements = new ArrayBuffer[Any]
- elements ++= values
- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true)
- elements.iterator.asInstanceOf[Iterator[T]]
+ * we may end up dropping a partition from memory store before getting it back.
+ *
+ * In addition, we must be careful to not unroll the entire partition in memory at once.
+ * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
+ * single partition. Instead, we unroll the values cautiously, potentially aborting and
+ * dropping the partition to disk if applicable.
+ */
+ blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
+ case Left(arr) =>
+ // We have successfully unrolled the entire partition, so cache it in memory
+ updatedBlocks ++=
+ blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
+ arr.iterator.asInstanceOf[Iterator[T]]
+ case Right(it) =>
+ // There is not enough space to cache this partition in memory
+ logWarning(s"Not enough space to cache partition $key in memory! " +
+ s"Free memory is ${blockManager.memoryStore.freeMemory} bytes.")
+ val returnValues = it.asInstanceOf[Iterator[T]]
+ if (putLevel.useDisk) {
+ logWarning(s"Persisting partition $key to disk instead.")
+ val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
+ useOffHeap = false, deserialized = false, putLevel.replication)
+ putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
+ } else {
+ returnValues
+ }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 8f70744d80..6ee731b22c 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -67,7 +67,7 @@ class SparkEnv (
val metricsSystem: MetricsSystem,
val conf: SparkConf) extends Logging {
- // A mapping of thread ID to amount of memory used for shuffle in bytes
+ // A mapping of thread ID to amount of memory, in bytes, used for shuffle aggregations
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b16133b20c..3b69bc4ca4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -266,11 +266,13 @@ private[spark] class Executor(
}
}
} finally {
- // TODO: Unregister shuffle memory only for ResultTask
+ // Release memory used by this thread for shuffles
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
+ // Release memory used by this thread for unrolling blocks
+ env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
runningTasks.remove(taskId)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0db0a5bc73..d746526639 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -38,7 +38,7 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
-private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
+private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
@@ -71,9 +71,9 @@ private[spark] class BlockManager(
// Actual storage of where blocks are kept
private var tachyonInitialized = false
- private[storage] val memoryStore = new MemoryStore(this, maxMemory)
- private[storage] val diskStore = new DiskStore(this, diskBlockManager)
- private[storage] lazy val tachyonStore: TachyonStore = {
+ private[spark] val memoryStore = new MemoryStore(this, maxMemory)
+ private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+ private[spark] lazy val tachyonStore: TachyonStore = {
val storeDir = conf.get("spark.tachyonStore.baseDir", "/tmp_spark_tachyon")
val appFolderName = conf.get("spark.tachyonStore.folderName")
val tachyonStorePath = s"$storeDir/$appFolderName/${this.executorId}"
@@ -463,16 +463,17 @@ private[spark] class BlockManager(
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
// Cache the values before returning them
- // TODO: Consider creating a putValues that also takes in a iterator?
- val valuesBuffer = new ArrayBuffer[Any]
- valuesBuffer ++= values
- memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
- match {
- case Left(values2) =>
- return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
- case _ =>
- throw new SparkException("Memory store did not return back an iterator")
- }
+ val putResult = memoryStore.putIterator(
+ blockId, values, level, returnValues = true, allowPersistToDisk = false)
+ // The put may or may not have succeeded, depending on whether there was enough
+ // space to unroll the block. Either way, the put here should return an iterator.
+ putResult.data match {
+ case Left(it) =>
+ return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
+ case _ =>
+ // This only happens if we dropped the values back to disk (which is never)
+ throw new SparkException("Memory store did not return an iterator!")
+ }
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
@@ -561,13 +562,14 @@ private[spark] class BlockManager(
iter
}
- def put(
+ def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
- tellMaster: Boolean): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
- doPut(blockId, IteratorValues(values), level, tellMaster)
+ doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
@@ -589,13 +591,14 @@ private[spark] class BlockManager(
* Put a new block of values to the block manager.
* Return a list of blocks updated as a result of this put.
*/
- def put(
+ def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(values != null, "Values is null")
- doPut(blockId, ArrayBufferValues(values), level, tellMaster)
+ doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
@@ -606,19 +609,33 @@ private[spark] class BlockManager(
blockId: BlockId,
bytes: ByteBuffer,
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {
require(bytes != null, "Bytes is null")
- doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
+ doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
}
+ /**
+ * Put the given block according to the given level in one of the block stores, replicating
+ * the values if necessary.
+ *
+ * The effective storage level refers to the level according to which the block will actually be
+ * handled. This allows the caller to specify an alternate behavior of doPut while preserving
+ * the original level specified by the user.
+ */
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
- tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
+ tellMaster: Boolean = true,
+ effectiveStorageLevel: Option[StorageLevel] = None)
+ : Seq[(BlockId, BlockStatus)] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
+ effectiveStorageLevel.foreach { level =>
+ require(level != null && level.isValid, "Effective StorageLevel is null or invalid")
+ }
// Return value
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -657,13 +674,16 @@ private[spark] class BlockManager(
// Size of the block in bytes
var size = 0L
+ // The level we actually use to put the block
+ val putLevel = effectiveStorageLevel.getOrElse(level)
+
// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = data match {
- case b: ByteBufferValues if level.replication > 1 =>
+ case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate()
- Future { replicate(blockId, bufferView, level) }
+ Future { replicate(blockId, bufferView, putLevel) }
case _ => null
}
@@ -676,18 +696,18 @@ private[spark] class BlockManager(
// returnValues - Whether to return the values put
// blockStore - The type of storage to put these values into
val (returnValues, blockStore: BlockStore) = {
- if (level.useMemory) {
+ if (putLevel.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
(true, memoryStore)
- } else if (level.useOffHeap) {
+ } else if (putLevel.useOffHeap) {
// Use tachyon for off-heap storage
(false, tachyonStore)
- } else if (level.useDisk) {
+ } else if (putLevel.useDisk) {
// Don't get back the bytes from put unless we replicate them
- (level.replication > 1, diskStore)
+ (putLevel.replication > 1, diskStore)
} else {
- assert(level == StorageLevel.NONE)
+ assert(putLevel == StorageLevel.NONE)
throw new BlockException(
blockId, s"Attempted to put block $blockId without specifying storage level!")
}
@@ -696,22 +716,22 @@ private[spark] class BlockManager(
// Actually put the values
val result = data match {
case IteratorValues(iterator) =>
- blockStore.putValues(blockId, iterator, level, returnValues)
- case ArrayBufferValues(array) =>
- blockStore.putValues(blockId, array, level, returnValues)
+ blockStore.putIterator(blockId, iterator, putLevel, returnValues)
+ case ArrayValues(array) =>
+ blockStore.putArray(blockId, array, putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
- blockStore.putBytes(blockId, bytes, level)
+ blockStore.putBytes(blockId, bytes, putLevel)
}
size = result.size
result.data match {
- case Left (newIterator) if level.useMemory => valuesAfterPut = newIterator
+ case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIterator
case Right (newBytes) => bytesAfterPut = newBytes
case _ =>
}
// Keep track of which blocks are dropped from memory
- if (level.useMemory) {
+ if (putLevel.useMemory) {
result.droppedBlocks.foreach { updatedBlocks += _ }
}
@@ -742,7 +762,7 @@ private[spark] class BlockManager(
// Either we're storing bytes and we asynchronously started replication, or we're storing
// values and need to serialize and replicate them now:
- if (level.replication > 1) {
+ if (putLevel.replication > 1) {
data match {
case ByteBufferValues(bytes) =>
if (replicationFuture != null) {
@@ -758,7 +778,7 @@ private[spark] class BlockManager(
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
- replicate(blockId, bytesAfterPut, level)
+ replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
@@ -766,7 +786,7 @@ private[spark] class BlockManager(
BlockManager.dispose(bytesAfterPut)
- if (level.replication > 1) {
+ if (putLevel.replication > 1) {
logDebug("Putting block %s with replication took %s"
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
} else {
@@ -818,7 +838,7 @@ private[spark] class BlockManager(
value: Any,
level: StorageLevel,
tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = {
- put(blockId, Iterator(value), level, tellMaster)
+ putIterator(blockId, Iterator(value), level, tellMaster)
}
/**
@@ -829,7 +849,7 @@ private[spark] class BlockManager(
*/
def dropFromMemory(
blockId: BlockId,
- data: Either[ArrayBuffer[Any], ByteBuffer]): Option[BlockStatus] = {
+ data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
@@ -853,7 +873,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data match {
case Left(elements) =>
- diskStore.putValues(blockId, elements, level, returnValues = false)
+ diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
@@ -1068,9 +1088,11 @@ private[spark] class BlockManager(
private[spark] object BlockManager extends Logging {
private val ID_GENERATOR = new IdGenerator
+ /** Return the total amount of storage memory available. */
private def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
- (Runtime.getRuntime.maxMemory * memoryFraction).toLong
+ val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
+ (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}
def getHeartBeatFrequency(conf: SparkConf): Long =
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
index b9b53b1a2f..69985c9759 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
@@ -37,15 +37,15 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
- def putValues(
+ def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
- def putValues(
+ def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index ebff0cb5ba..c83261dd91 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -21,8 +21,6 @@ import java.io.{FileOutputStream, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on disk.
*/
-private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
+private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager)
extends BlockStore(blockManager) with Logging {
val minMemoryMapBytes = blockManager.conf.getLong("spark.storage.memoryMapThreshold", 2 * 4096L)
@@ -57,15 +55,15 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putValues(blockId, values.toIterator, level, returnValues)
+ putIterator(blockId, values.toIterator, level, returnValues)
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 71f66c826c..28f675c2bb 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -20,27 +20,45 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
import java.util.LinkedHashMap
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.util.{SizeEstimator, Utils}
+import org.apache.spark.util.collection.SizeTrackingVector
private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
/**
- * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as
+ * Stores blocks in memory, either as Arrays of deserialized Java objects or as
* serialized ByteBuffers.
*/
-private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
+private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
+ private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
+
@volatile private var currentMemory = 0L
- // Object used to ensure that only one thread is putting blocks and if necessary, dropping
- // blocks from the memory store.
- private val putLock = new Object()
+
+ // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
+ private val accountingLock = new Object
+
+ // A mapping from thread ID to amount of memory used for unrolling a block (in bytes)
+ // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+ private val unrollMemoryMap = mutable.HashMap[Long, Long]()
+
+ /**
+ * The amount of space ensured for unrolling values in memory, shared across all cores.
+ * This space is not reserved in advance, but allocated dynamically by dropping existing blocks.
+ */
+ private val maxUnrollMemory: Long = {
+ val unrollFraction = conf.getDouble("spark.storage.unrollFraction", 0.2)
+ (maxMemory * unrollFraction).toLong
+ }
logInfo("MemoryStore started with capacity %s".format(Utils.bytesToString(maxMemory)))
+ /** Free memory not occupied by existing blocks. Note that this does not include unroll memory. */
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: BlockId): Long = {
@@ -55,20 +73,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
bytes.rewind()
if (level.deserialized) {
val values = blockManager.dataDeserialize(blockId, bytes)
- val elements = new ArrayBuffer[Any]
- elements ++= values
- val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
- val putAttempt = tryToPut(blockId, elements, sizeEstimate, deserialized = true)
- PutResult(sizeEstimate, Left(values.toIterator), putAttempt.droppedBlocks)
+ putIterator(blockId, values, level, returnValues = true)
} else {
val putAttempt = tryToPut(blockId, bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()), putAttempt.droppedBlocks)
}
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
if (level.deserialized) {
@@ -82,14 +96,52 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- val valueEntries = new ArrayBuffer[Any]()
- valueEntries ++= values
- putValues(blockId, valueEntries, level, returnValues)
+ putIterator(blockId, values, level, returnValues, allowPersistToDisk = true)
+ }
+
+ /**
+ * Attempt to put the given block in memory store.
+ *
+ * There may not be enough space to fully unroll the iterator in memory, in which case we
+ * optionally drop the values to disk if
+ * (1) the block's storage level specifies useDisk, and
+ * (2) `allowPersistToDisk` is true.
+ *
+ * One scenario in which `allowPersistToDisk` is false is when the BlockManager reads a block
+ * back from disk and attempts to cache it in memory. In this case, we should not persist the
+ * block back on disk again, as it is already in disk store.
+ */
+ private[storage] def putIterator(
+ blockId: BlockId,
+ values: Iterator[Any],
+ level: StorageLevel,
+ returnValues: Boolean,
+ allowPersistToDisk: Boolean): PutResult = {
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val unrolledValues = unrollSafely(blockId, values, droppedBlocks)
+ unrolledValues match {
+ case Left(arrayValues) =>
+ // Values are fully unrolled in memory, so store them as an array
+ val res = putArray(blockId, arrayValues, level, returnValues)
+ droppedBlocks ++= res.droppedBlocks
+ PutResult(res.size, res.data, droppedBlocks)
+ case Right(iteratorValues) =>
+ // Not enough space to unroll this block; drop to disk if applicable
+ logWarning(s"Not enough space to store block $blockId in memory! " +
+ s"Free memory is $freeMemory bytes.")
+ if (level.useDisk && allowPersistToDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues)
+ PutResult(res.size, res.data, droppedBlocks)
+ } else {
+ PutResult(0, Left(iteratorValues), droppedBlocks)
+ }
+ }
}
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
@@ -99,7 +151,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[ArrayBuffer[Any]].iterator))
+ Some(blockManager.dataSerialize(blockId, entry.value.asInstanceOf[Array[Any]].iterator))
} else {
Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't actually copy the data
}
@@ -112,7 +164,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry == null) {
None
} else if (entry.deserialized) {
- Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
+ Some(entry.value.asInstanceOf[Array[Any]].iterator)
} else {
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
@@ -141,6 +193,93 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
/**
+ * Unroll the given block in memory safely.
+ *
+ * The safety of this operation refers to avoiding potential OOM exceptions caused by
+ * unrolling the entirety of the block in memory at once. This is achieved by periodically
+ * checking whether the memory restrictions for unrolling blocks are still satisfied,
+ * stopping immediately if not. This check is a safeguard against the scenario in which
+ * there is not enough free memory to accommodate the entirety of a single block.
+ *
+ * This method returns either an array with the contents of the entire block or an iterator
+ * containing the values of the block (if the array would have exceeded available memory).
+ */
+ def unrollSafely(
+ blockId: BlockId,
+ values: Iterator[Any],
+ droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+ : Either[Array[Any], Iterator[Any]] = {
+
+ // Number of elements unrolled so far
+ var elementsUnrolled = 0
+ // Whether there is still enough memory for us to continue unrolling this block
+ var keepUnrolling = true
+ // Initial per-thread memory to request for unrolling blocks (bytes). Exposed for testing.
+ val initialMemoryThreshold = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+ // How often to check whether we need to request more memory
+ val memoryCheckPeriod = 16
+ // Memory currently reserved by this thread for this particular unrolling operation
+ var memoryThreshold = initialMemoryThreshold
+ // Memory to request as a multiple of current vector size
+ val memoryGrowthFactor = 1.5
+ // Previous unroll memory held by this thread, for releasing later (only at the very end)
+ val previousMemoryReserved = currentUnrollMemoryForThisThread
+ // Underlying vector for unrolling the block
+ var vector = new SizeTrackingVector[Any]
+
+ // Request enough memory to begin unrolling
+ keepUnrolling = reserveUnrollMemoryForThisThread(initialMemoryThreshold)
+
+ // Unroll this block safely, checking whether we have exceeded our threshold periodically
+ try {
+ while (values.hasNext && keepUnrolling) {
+ vector += values.next()
+ if (elementsUnrolled % memoryCheckPeriod == 0) {
+ // If our vector's size has exceeded the threshold, request more memory
+ val currentSize = vector.estimateSize()
+ if (currentSize >= memoryThreshold) {
+ val amountToRequest = (currentSize * (memoryGrowthFactor - 1)).toLong
+ // Hold the accounting lock, in case another thread concurrently puts a block that
+ // takes up the unrolling space we just ensured here
+ accountingLock.synchronized {
+ if (!reserveUnrollMemoryForThisThread(amountToRequest)) {
+ // If the first request is not granted, try again after ensuring free space
+ // If there is still not enough space, give up and drop the partition
+ val spaceToEnsure = maxUnrollMemory - currentUnrollMemory
+ if (spaceToEnsure > 0) {
+ val result = ensureFreeSpace(blockId, spaceToEnsure)
+ droppedBlocks ++= result.droppedBlocks
+ }
+ keepUnrolling = reserveUnrollMemoryForThisThread(amountToRequest)
+ }
+ }
+ // New threshold is currentSize * memoryGrowthFactor
+ memoryThreshold = currentSize + amountToRequest
+ }
+ }
+ elementsUnrolled += 1
+ }
+
+ if (keepUnrolling) {
+ // We successfully unrolled the entirety of this block
+ Left(vector.toArray)
+ } else {
+ // We ran out of space while unrolling the values for this block
+ Right(vector.iterator ++ values)
+ }
+
+ } finally {
+ // If we return an array, the values returned do not depend on the underlying vector and
+ // we can immediately free up space for other threads. Otherwise, if we return an iterator,
+ // we release the memory claimed by this thread later on when the task finishes.
+ if (keepUnrolling) {
+ val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
+ releaseUnrollMemoryForThisThread(amountToRelease)
+ }
+ }
+ }
+
+ /**
* Return the RDD ID that a given block ID is from, or None if it is not an RDD block.
*/
private def getRddId(blockId: BlockId): Option[Int] = {
@@ -149,10 +288,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
- * an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
- * size must also be passed by the caller.
+ * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
+ * must also be passed by the caller.
*
- * Lock on the object putLock to ensure that all the put requests and its associated block
+ * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
@@ -174,7 +313,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
var putSuccess = false
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- putLock.synchronized {
+ accountingLock.synchronized {
val freeSpaceResult = ensureFreeSpace(blockId, size)
val enoughFreeSpace = freeSpaceResult.success
droppedBlocks ++= freeSpaceResult.droppedBlocks
@@ -193,7 +332,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
- Left(value.asInstanceOf[ArrayBuffer[Any]])
+ Left(value.asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -210,12 +349,14 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * Assume that a lock is held by the caller to ensure only one thread is dropping blocks.
- * Otherwise, the freed space may fill up before the caller puts in their new value.
+ * Assume that `accountingLock` is held by the caller to ensure only one thread is dropping
+ * blocks. Otherwise, the freed space may fill up before the caller puts in their new value.
*
* Return whether there is enough free space, along with the blocks dropped in the process.
*/
- private def ensureFreeSpace(blockIdToAdd: BlockId, space: Long): ResultWithDroppedBlocks = {
+ private def ensureFreeSpace(
+ blockIdToAdd: BlockId,
+ space: Long): ResultWithDroppedBlocks = {
logInfo(s"ensureFreeSpace($space) called with curMem=$currentMemory, maxMem=$maxMemory")
val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -225,9 +366,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return ResultWithDroppedBlocks(success = false, droppedBlocks)
}
- if (maxMemory - currentMemory < space) {
+ // Take into account the amount of memory currently occupied by unrolling blocks
+ val actualFreeMemory = freeMemory - currentUnrollMemory
+
+ if (actualFreeMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
- val selectedBlocks = new ArrayBuffer[BlockId]()
+ val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
// This is synchronized to ensure that the set of entries is not changed
@@ -235,7 +379,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
- while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
+ while (actualFreeMemory + selectedMemory < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) {
@@ -245,7 +389,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
- if (maxMemory - (currentMemory - selectedMemory) >= space) {
+ if (actualFreeMemory + selectedMemory >= space) {
logInfo(s"${selectedBlocks.size} blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.synchronized { entries.get(blockId) }
@@ -254,7 +398,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
- Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
+ Left(entry.value.asInstanceOf[Array[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
@@ -275,8 +419,56 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def contains(blockId: BlockId): Boolean = {
entries.synchronized { entries.containsKey(blockId) }
}
+
+ /**
+ * Reserve additional memory for unrolling blocks used by this thread.
+ * Return whether the request is granted.
+ */
+ private[spark] def reserveUnrollMemoryForThisThread(memory: Long): Boolean = {
+ accountingLock.synchronized {
+ val granted = freeMemory > currentUnrollMemory + memory
+ if (granted) {
+ val threadId = Thread.currentThread().getId
+ unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, 0L) + memory
+ }
+ granted
+ }
+ }
+
+ /**
+ * Release memory used by this thread for unrolling blocks.
+ * If the amount is not specified, remove the current thread's allocation altogether.
+ */
+ private[spark] def releaseUnrollMemoryForThisThread(memory: Long = -1L): Unit = {
+ val threadId = Thread.currentThread().getId
+ accountingLock.synchronized {
+ if (memory < 0) {
+ unrollMemoryMap.remove(threadId)
+ } else {
+ unrollMemoryMap(threadId) = unrollMemoryMap.getOrElse(threadId, memory) - memory
+ // If this thread claims no more unroll memory, release it completely
+ if (unrollMemoryMap(threadId) <= 0) {
+ unrollMemoryMap.remove(threadId)
+ }
+ }
+ }
+ }
+
+ /**
+ * Return the amount of memory currently occupied for unrolling blocks across all threads.
+ */
+ private[spark] def currentUnrollMemory: Long = accountingLock.synchronized {
+ unrollMemoryMap.values.sum
+ }
+
+ /**
+ * Return the amount of memory currently occupied for unrolling blocks by this thread.
+ */
+ private[spark] def currentUnrollMemoryForThisThread: Long = accountingLock.synchronized {
+ unrollMemoryMap.getOrElse(Thread.currentThread().getId, 0L)
+ }
}
-private case class ResultWithDroppedBlocks(
+private[spark] case class ResultWithDroppedBlocks(
success: Boolean,
droppedBlocks: Seq[(BlockId, BlockStatus)])
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
index d8ff4ff6bd..932b561604 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonStore.scala
@@ -20,8 +20,6 @@ package org.apache.spark.storage
import java.io.IOException
import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-
import tachyon.client.{ReadType, WriteType}
import org.apache.spark.Logging
@@ -30,7 +28,7 @@ import org.apache.spark.util.Utils
/**
* Stores BlockManager blocks on Tachyon.
*/
-private class TachyonStore(
+private[spark] class TachyonStore(
blockManager: BlockManager,
tachyonManager: TachyonBlockManager)
extends BlockStore(blockManager: BlockManager) with Logging {
@@ -45,15 +43,15 @@ private class TachyonStore(
putIntoTachyonStore(blockId, bytes, returnValues = true)
}
- override def putValues(
+ override def putArray(
blockId: BlockId,
- values: ArrayBuffer[Any],
+ values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
- putValues(blockId, values.toIterator, level, returnValues)
+ putIterator(blockId, values.toIterator, level, returnValues)
}
- override def putValues(
+ override def putIterator(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 328be158db..75c2e09a6b 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -48,7 +48,7 @@ private[spark] object ThreadingTest {
val block = (1 to blockSize).map(_ => Random.nextInt())
val level = randomLevel()
val startTime = System.currentTimeMillis()
- manager.put(blockId, block.iterator, level, tellMaster = true)
+ manager.putIterator(blockId, block.iterator, level, tellMaster = true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
queue.add((blockId, block))
}
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 0846557530..bce3b3afe9 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -180,7 +180,7 @@ private[spark] object SizeEstimator extends Logging {
}
}
- // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
+ // Estimate the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
private val ARRAY_SIZE_FOR_SAMPLING = 200
private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
index b84eb65c62..7e76d060d6 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
@@ -36,7 +36,7 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
_array(index)
}
- def +=(value: V) {
+ def +=(value: V): Unit = {
if (_numElements == _array.length) {
resize(_array.length * 2)
}
@@ -50,6 +50,19 @@ class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize:
def size: Int = _numElements
+ def iterator: Iterator[V] = new Iterator[V] {
+ var index = 0
+ override def hasNext: Boolean = index < _numElements
+ override def next(): V = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+ val value = _array(index)
+ index += 1
+ value
+ }
+ }
+
/** Gets the underlying array backing this vector. */
def array: Array[V] = _array
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
new file mode 100644
index 0000000000..3eb1010dc1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * A general interface for collections to keep track of their estimated sizes in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
+ * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds).
+ */
+private[spark] trait SizeTracker {
+
+ import SizeTracker._
+
+ /**
+ * Controls the base of the exponential which governs the rate of sampling.
+ * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+ */
+ private val SAMPLE_GROWTH_RATE = 1.1
+
+ /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */
+ private val samples = new mutable.Queue[Sample]
+
+ /** The average number of bytes per update between our last two samples. */
+ private var bytesPerUpdate: Double = _
+
+ /** Total number of insertions and updates into the map since the last resetSamples(). */
+ private var numUpdates: Long = _
+
+ /** The value of 'numUpdates' at which we will take our next sample. */
+ private var nextSampleNum: Long = _
+
+ resetSamples()
+
+ /**
+ * Reset samples collected so far.
+ * This should be called after the collection undergoes a dramatic change in size.
+ */
+ protected def resetSamples(): Unit = {
+ numUpdates = 1
+ nextSampleNum = 1
+ samples.clear()
+ takeSample()
+ }
+
+ /**
+ * Callback to be invoked after every update.
+ */
+ protected def afterUpdate(): Unit = {
+ numUpdates += 1
+ if (nextSampleNum == numUpdates) {
+ takeSample()
+ }
+ }
+
+ /**
+ * Take a new sample of the current collection's size.
+ */
+ private def takeSample(): Unit = {
+ samples.enqueue(Sample(SizeEstimator.estimate(this), numUpdates))
+ // Only use the last two samples to extrapolate
+ if (samples.size > 2) {
+ samples.dequeue()
+ }
+ val bytesDelta = samples.toList.reverse match {
+ case latest :: previous :: tail =>
+ (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
+ // If fewer than 2 samples, assume no change
+ case _ => 0
+ }
+ bytesPerUpdate = math.max(0, bytesDelta)
+ nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+ }
+
+ /**
+ * Estimate the current size of the collection in bytes. O(1) time.
+ */
+ def estimateSize(): Long = {
+ assert(samples.nonEmpty)
+ val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
+ (samples.last.size + extrapolatedDelta).toLong
+ }
+}
+
+private object SizeTracker {
+ case class Sample(size: Long, numUpdates: Long)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
index 204330dad4..de61e1d17f 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingAppendOnlyMap.scala
@@ -17,85 +17,24 @@
package org.apache.spark.util.collection
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.util.SizeEstimator
-import org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.Sample
-
/**
- * Append-only map that keeps track of its estimated size in bytes.
- * We sample with a slow exponential back-off using the SizeEstimator to amortize the time,
- * as each call to SizeEstimator can take a sizable amount of time (order of a few milliseconds).
+ * An append-only map that keeps track of its estimated size in bytes.
*/
-private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] {
-
- /**
- * Controls the base of the exponential which governs the rate of sampling.
- * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
- */
- private val SAMPLE_GROWTH_RATE = 1.1
-
- /** All samples taken since last resetSamples(). Only the last two are used for extrapolation. */
- private val samples = new ArrayBuffer[Sample]()
-
- /** Total number of insertions and updates into the map since the last resetSamples(). */
- private var numUpdates: Long = _
-
- /** The value of 'numUpdates' at which we will take our next sample. */
- private var nextSampleNum: Long = _
-
- /** The average number of bytes per update between our last two samples. */
- private var bytesPerUpdate: Double = _
-
- resetSamples()
-
- /** Called after the map grows in size, as this can be a dramatic change for small objects. */
- def resetSamples() {
- numUpdates = 1
- nextSampleNum = 1
- samples.clear()
- takeSample()
- }
+private[spark] class SizeTrackingAppendOnlyMap[K, V] extends AppendOnlyMap[K, V] with SizeTracker {
override def update(key: K, value: V): Unit = {
super.update(key, value)
- numUpdates += 1
- if (nextSampleNum == numUpdates) { takeSample() }
+ super.afterUpdate()
}
override def changeValue(key: K, updateFunc: (Boolean, V) => V): V = {
val newValue = super.changeValue(key, updateFunc)
- numUpdates += 1
- if (nextSampleNum == numUpdates) { takeSample() }
+ super.afterUpdate()
newValue
}
- /** Takes a new sample of the current map's size. */
- def takeSample() {
- samples += Sample(SizeEstimator.estimate(this), numUpdates)
- // Only use the last two samples to extrapolate. If fewer than 2 samples, assume no change.
- bytesPerUpdate = math.max(0, samples.toSeq.reverse match {
- case latest :: previous :: tail =>
- (latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates)
- case _ =>
- 0
- })
- nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
- }
-
- override protected def growTable() {
+ override protected def growTable(): Unit = {
super.growTable()
resetSamples()
}
-
- /** Estimates the current size of the map in bytes. O(1) time. */
- def estimateSize(): Long = {
- assert(samples.nonEmpty)
- val extrapolatedDelta = bytesPerUpdate * (numUpdates - samples.last.numUpdates)
- (samples.last.size + extrapolatedDelta).toLong
- }
-}
-
-private object SizeTrackingAppendOnlyMap {
- case class Sample(size: Long, numUpdates: Long)
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
new file mode 100644
index 0000000000..65a7b4e0d4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/SizeTrackingVector.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+
+/**
+ * An append-only buffer that keeps track of its estimated size in bytes.
+ */
+private[spark] class SizeTrackingVector[T: ClassTag]
+ extends PrimitiveVector[T]
+ with SizeTracker {
+
+ override def +=(value: T): Unit = {
+ super.+=(value)
+ super.afterUpdate()
+ }
+
+ override def resize(newLength: Int): PrimitiveVector[T] = {
+ super.resize(newLength)
+ resetSamples()
+ this
+ }
+
+ /**
+ * Return a trimmed version of the underlying array.
+ */
+ def toArray: Array[T] = {
+ super.iterator.toArray
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 7f5d0b061e..9c5f394d38 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark
-import scala.collection.mutable.ArrayBuffer
-
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.scalatest.mock.EasyMockSugar
@@ -52,22 +50,21 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
}
test("get uncached rdd") {
- expecting {
- blockManager.get(RDDBlockId(0, 0)).andReturn(None)
- blockManager.put(RDDBlockId(0, 0), ArrayBuffer[Any](1, 2, 3, 4), StorageLevel.MEMORY_ONLY,
- true).andStubReturn(Seq[(BlockId, BlockStatus)]())
- }
-
- whenExecuting(blockManager) {
- val context = new TaskContext(0, 0, 0)
- val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
- assert(value.toList === List(1, 2, 3, 4))
- }
+ // Do not mock this test, because attempting to match Array[Any], which is not covariant,
+ // in blockManager.put is a losing battle. You have been warned.
+ blockManager = sc.env.blockManager
+ cacheManager = sc.env.cacheManager
+ val context = new TaskContext(0, 0, 0)
+ val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
+ val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
+ assert(computeValue.toList === List(1, 2, 3, 4))
+ assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
+ assert(getValue.get.data.toList === List(1, 2, 3, 4))
}
test("get cached rdd") {
expecting {
- val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+ val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 23cb6905bf..dd4fd535d3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
@@ -43,6 +43,7 @@ import scala.language.postfixOps
class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
with PrivateMethodTester {
+
private val conf = new SparkConf(false)
var store: BlockManager = null
var store2: BlockManager = null
@@ -61,21 +62,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int) = RDDBlockId(rddId, splitId)
+ private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+ new BlockManager(
+ name, actorSystem, master, serializer, maxMem, conf, securityMgr, mapOutputTracker)
+ }
+
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0, conf = conf,
- securityManager = securityMgr)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+ "test", "localhost", 0, conf = conf, securityManager = securityMgr)
this.actorSystem = actorSystem
- conf.set("spark.driver.port", boundPort.toString)
-
- master = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
- conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
conf.set("os.arch", "amd64")
conf.set("spark.test.useCompressedOops", "true")
conf.set("spark.storage.disableBlockManagerHeartBeat", "true")
+ conf.set("spark.driver.port", boundPort.toString)
+ conf.set("spark.storage.unrollFraction", "0.4")
+ conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+ master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+ conf)
+
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
}
@@ -138,11 +147,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("master + 1 manager interaction") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -169,10 +177,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("master + 2 managers interaction") {
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer(conf), 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000, "exec1")
+ store2 = makeBlockManager(2000, "exec2")
val peers = master.getPeers(store.blockManagerId, 1)
assert(peers.size === 1, "master did not return the other manager as a peer")
@@ -187,11 +193,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("removing block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
@@ -200,8 +205,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
- assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
- assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(memStatus._1 == 20000L, "total memory " + memStatus._1 + " should equal 20000")
+ assert(memStatus._2 <= 12000L, "remaining memory " + memStatus._2 + " should <= 12000")
assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
@@ -230,17 +235,16 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
val memStatus = master.getMemoryStatus.head._2
- memStatus._1 should equal (2000L)
- memStatus._2 should equal (2000L)
+ memStatus._1 should equal (20000L)
+ memStatus._2 should equal (20000L)
}
}
test("removing rdd") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(20000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
@@ -270,11 +274,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("removing broadcast") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val driverStore = store
- val executorStore = new BlockManager("executor", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ val executorStore = makeBlockManager(2000, "executor")
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -343,8 +345,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("reregistration on heart beat") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
@@ -380,8 +381,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("reregistration doesn't dead lock") {
val heartBeat = PrivateMethod[Unit]('heartBeat)
- store = new BlockManager("<driver>", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
val a2 = List(new Array[Byte](400))
@@ -390,7 +390,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
- store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
@@ -418,19 +418,14 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("correct BlockResult returned from get() calls") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
- mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list1ForSizeEstimate = new ArrayBuffer[Any]
- list1ForSizeEstimate ++= list1.iterator
- val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
- val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
- val list2ForSizeEstimate = new ArrayBuffer[Any]
- list2ForSizeEstimate ++= list2.iterator
- val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
+ val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
+ val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
assert(list1Get.get.data.size === 2)
@@ -451,11 +446,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
@@ -471,11 +465,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
@@ -491,11 +484,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU for partitions of same RDD") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
@@ -511,11 +503,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU for partitions of multiple RDDs") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 2), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(12000)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@@ -523,8 +514,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
- store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@@ -538,8 +529,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// TODO Make the spark.test.tachyon.enable true after using tachyon 0.5.0 testing jar.
val tachyonUnitTestEnabled = conf.getBoolean("spark.test.tachyon.enable", false)
if (tachyonUnitTestEnabled) {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -555,8 +545,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("on-disk storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(1200)
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
@@ -569,11 +558,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -585,11 +573,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
@@ -601,11 +588,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with serialization") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -617,11 +603,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("disk and memory storage with serialization and getLocalBytes") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
@@ -633,12 +618,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("LRU with mixed storage levels") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val a1 = new Array[Byte](400)
- val a2 = new Array[Byte](400)
- val a3 = new Array[Byte](400)
- val a4 = new Array[Byte](400)
+ store = makeBlockManager(12000)
+ val a1 = new Array[Byte](4000)
+ val a2 = new Array[Byte](4000)
+ val a3 = new Array[Byte](4000)
+ val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
@@ -656,14 +640,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("in-memory LRU with streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list2 = List(new Array[Byte](200), new Array[Byte](200))
- val list3 = List(new Array[Byte](200), new Array[Byte](200))
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
assert(store.get("list3").isDefined, "list3 was not in store")
@@ -672,7 +655,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.get("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
assert(store.get("list2").isDefined, "list2 was not in store")
@@ -681,16 +664,15 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("LRU with mixed storage levels and streams") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list1 = List(new Array[Byte](200), new Array[Byte](200))
- val list2 = List(new Array[Byte](200), new Array[Byte](200))
- val list3 = List(new Array[Byte](200), new Array[Byte](200))
- val list4 = List(new Array[Byte](200), new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
+ val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
- store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
- store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ store.putIterator("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIterator("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.putIterator("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
val listSize = SizeEstimator.estimate(listForSizeEstimate)
@@ -708,7 +690,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.get("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
- store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+ store.putIterator("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
@@ -731,11 +713,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("overly large block") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 500, conf,
- securityMgr, mapOutputTracker)
- store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(5000)
+ store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingle("a1") === None, "a1 was in store")
- store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
}
@@ -743,8 +724,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
test("block compression") {
try {
conf.set("spark.shuffle.compress", "true")
- store = new BlockManager("exec1", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
+ store = makeBlockManager(20000, "exec1")
store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
@@ -752,52 +732,46 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
store = null
conf.set("spark.shuffle.compress", "false")
- store = new BlockManager("exec2", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 1000,
+ store = makeBlockManager(20000, "exec2")
+ store.putSingle(ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
store.stop()
store = null
conf.set("spark.broadcast.compress", "true")
- store = new BlockManager("exec3", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 100,
+ store = makeBlockManager(20000, "exec3")
+ store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
store.stop()
store = null
conf.set("spark.broadcast.compress", "false")
- store = new BlockManager("exec4", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(BroadcastBlockId(0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 1000, "broadcast_0 was compressed")
+ store = makeBlockManager(20000, "exec4")
+ store.putSingle(BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "true")
- store = new BlockManager("exec5", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(rdd(0, 0)) <= 100, "rdd_0_0 was not compressed")
+ store = makeBlockManager(20000, "exec5")
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
- store = new BlockManager("exec6", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
- assert(store.memoryStore.getSize(rdd(0, 0)) >= 1000, "rdd_0_0 was compressed")
+ store = makeBlockManager(20000, "exec6")
+ store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
- store = new BlockManager("exec7", actorSystem, master, serializer, 2000, conf,
- securityMgr, mapOutputTracker)
- store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
- assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store = makeBlockManager(20000, "exec7")
+ store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
} finally {
@@ -871,30 +845,29 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(Arrays.equals(mappedAsArray, bytes))
assert(Arrays.equals(notMappedAsArray, bytes))
}
-
+
test("updated block statuses") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](200))
- val bigList = List.fill(8)(new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](2000))
+ val bigList = List.fill(8)(new Array[Byte](2000))
// 1 updated block (i.e. list1)
val updatedBlocks1 =
- store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks1.size === 1)
assert(updatedBlocks1.head._1 === TestBlockId("list1"))
assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 1 updated block (i.e. list2)
val updatedBlocks2 =
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
assert(updatedBlocks2.size === 1)
assert(updatedBlocks2.head._1 === TestBlockId("list2"))
assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY)
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 =
- store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks3.size === 2)
updatedBlocks3.foreach { case (id, status) =>
id match {
@@ -903,11 +876,11 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
case _ => fail("Updated block is neither list1 nor list3")
}
}
- assert(store.get("list3").isDefined, "list3 was not in store")
+ assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 =
- store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks4.size === 2)
updatedBlocks4.foreach { case (id, status) =>
id match {
@@ -916,26 +889,37 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
case _ => fail("Updated block is neither list2 nor list4")
}
}
- assert(store.get("list4").isDefined, "list4 was not in store")
+ assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+ assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
- // No updated blocks - nothing is kicked out of memory because list5 is too big to be added
+ // No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 =
- store.put("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(updatedBlocks5.size === 0)
- assert(store.get("list2").isDefined, "list2 was not in store")
- assert(store.get("list4").isDefined, "list4 was not in store")
- assert(!store.get("list5").isDefined, "list5 was in store")
+
+ // memory store contains only list3 and list4
+ assert(!store.memoryStore.contains("list1"), "list1 was in memory store")
+ assert(!store.memoryStore.contains("list2"), "list2 was in memory store")
+ assert(store.memoryStore.contains("list3"), "list3 was not in memory store")
+ assert(store.memoryStore.contains("list4"), "list4 was not in memory store")
+ assert(!store.memoryStore.contains("list5"), "list5 was in memory store")
+
+ // disk store contains only list2
+ assert(!store.diskStore.contains("list1"), "list1 was in disk store")
+ assert(store.diskStore.contains("list2"), "list2 was not in disk store")
+ assert(!store.diskStore.contains("list3"), "list3 was in disk store")
+ assert(!store.diskStore.contains("list4"), "list4 was in disk store")
+ assert(!store.diskStore.contains("list5"), "list5 was in disk store")
}
test("query block statuses") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](200))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
- store.put("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getLocations("list1").size === 0)
@@ -949,9 +933,9 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
- store.put("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
- store.put("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.put("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
+ store.putIterator("list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
// getBlockStatus without asking slaves should have the same result
@@ -968,23 +952,22 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("get matching blocks") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- val list = List.fill(2)(new Array[Byte](10))
+ store = makeBlockManager(12000)
+ val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
- store.put("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("list"), askSlaves = false).size === 3)
assert(store.master.getMatchingBlockIds(_.toString.contains("list1"), askSlaves = false).size === 1)
// insert some more blocks
- store.put("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
- store.put("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
- store.put("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
+ store.putIterator("newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
+ store.putIterator("newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
assert(store.master.getMatchingBlockIds(_.toString.contains("newlist"), askSlaves = false).size === 1)
@@ -992,7 +975,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
- store.put(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.putIterator(blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
case RDDBlockId(1, _) => true
@@ -1002,17 +985,240 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
}
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
- store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
- securityMgr, mapOutputTracker)
- store.putSingle(rdd(0, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
- store.putSingle(rdd(1, 0), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store = makeBlockManager(12000)
+ store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingle(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
- store.putSingle(rdd(0, 1), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
assert(!store.memoryStore.contains(rdd(1, 0)), "rdd_1_0 was in store")
}
+
+ test("reserve/release unroll memory") {
+ store = makeBlockManager(12000)
+ val memoryStore = store.memoryStore
+ assert(memoryStore.currentUnrollMemory === 0)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Reserve
+ memoryStore.reserveUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 100)
+ memoryStore.reserveUnrollMemoryForThisThread(200)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 300)
+ memoryStore.reserveUnrollMemoryForThisThread(500)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 800)
+ memoryStore.reserveUnrollMemoryForThisThread(1000000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 800) // not granted
+ // Release
+ memoryStore.releaseUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 700)
+ memoryStore.releaseUnrollMemoryForThisThread(100)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 600)
+ // Reserve again
+ memoryStore.reserveUnrollMemoryForThisThread(4400)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 5000)
+ memoryStore.reserveUnrollMemoryForThisThread(20000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 5000) // not granted
+ // Release again
+ memoryStore.releaseUnrollMemoryForThisThread(1000)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 4000)
+ memoryStore.releaseUnrollMemoryForThisThread() // release all
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ }
+
+ /**
+ * Verify the result of MemoryStore#unrollSafely is as expected.
+ */
+ private def verifyUnroll(
+ expected: Iterator[Any],
+ result: Either[Array[Any], Iterator[Any]],
+ shouldBeArray: Boolean): Unit = {
+ val actual: Iterator[Any] = result match {
+ case Left(arr: Array[Any]) =>
+ assert(shouldBeArray, "expected iterator from unroll!")
+ arr.iterator
+ case Right(it: Iterator[Any]) =>
+ assert(!shouldBeArray, "expected array from unroll!")
+ it
+ case _ =>
+ fail("unroll returned neither an iterator nor an array...")
+ }
+ expected.zip(actual).foreach { case (e, a) =>
+ assert(e === a, "unroll did not return original values!")
+ }
+ }
+
+ test("safely unroll blocks") {
+ store = makeBlockManager(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ val memoryStore = store.memoryStore
+ val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with all the space in the world. This should succeed and return an array.
+ var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with not enough space. This should succeed after kicking out someBlock1.
+ store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks)
+ verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ assert(droppedBlocks.size === 1)
+ assert(droppedBlocks.head._1 === TestBlockId("someBlock1"))
+ droppedBlocks.clear()
+
+ // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
+ // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
+ // In the mean time, however, we kicked out someBlock2 before giving up.
+ store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
+ unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks)
+ verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ assert(droppedBlocks.size === 1)
+ assert(droppedBlocks.head._1 === TestBlockId("someBlock2"))
+ droppedBlocks.clear()
+ }
+
+ test("safely unroll blocks through putIterator") {
+ store = makeBlockManager(12000)
+ val memOnly = StorageLevel.MEMORY_ONLY
+ val memoryStore = store.memoryStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll with plenty of space. This should succeed and cache both blocks.
+ val result1 = memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ val result2 = memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(result1.size > 0) // unroll was successful
+ assert(result2.size > 0)
+ assert(result1.data.isLeft) // unroll did not drop this block to disk
+ assert(result2.data.isLeft)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Re-put these two blocks so block manager knows about them too. Otherwise, block manager
+ // would not know how to drop them from memory later.
+ memoryStore.remove("b1")
+ memoryStore.remove("b2")
+ store.putIterator("b1", smallIterator, memOnly)
+ store.putIterator("b2", smallIterator, memOnly)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ assert(result3.size > 0)
+ assert(result3.data.isLeft)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.remove("b3")
+ store.putIterator("b3", smallIterator, memOnly)
+
+ // Unroll huge block with not enough space. This should fail and kick out b2 in the process.
+ val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
+ assert(result4.size === 0) // unroll was unsuccessful
+ assert(result4.data.isLeft)
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ }
+
+ /**
+ * This test is essentially identical to the preceding one, except that it uses MEMORY_AND_DISK.
+ */
+ test("safely unroll blocks through putIterator (disk)") {
+ store = makeBlockManager(12000)
+ val memAndDisk = StorageLevel.MEMORY_AND_DISK
+ val memoryStore = store.memoryStore
+ val diskStore = store.diskStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator = bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ store.putIterator("b1", smallIterator, memAndDisk)
+ store.putIterator("b2", smallIterator, memAndDisk)
+
+ // Unroll with not enough space. This should succeed but kick out b1 in the process.
+ // Memory store should contain b2 and b3, while disk store should contain only b1
+ val result3 = memoryStore.putIterator("b3", smallIterator, memAndDisk, returnValues = true)
+ assert(result3.size > 0)
+ assert(!memoryStore.contains("b1"))
+ assert(memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(diskStore.contains("b1"))
+ assert(!diskStore.contains("b2"))
+ assert(!diskStore.contains("b3"))
+ memoryStore.remove("b3")
+ store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll huge block with not enough space. This should fail and drop the new block to disk
+ // directly in addition to kicking out b2 in the process. Memory store should contain only
+ // b3, while disk store should contain b1, b2 and b4.
+ val result4 = memoryStore.putIterator("b4", bigIterator, memAndDisk, returnValues = true)
+ assert(result4.size > 0)
+ assert(result4.data.isRight) // unroll returned bytes from disk
+ assert(!memoryStore.contains("b1"))
+ assert(!memoryStore.contains("b2"))
+ assert(memoryStore.contains("b3"))
+ assert(!memoryStore.contains("b4"))
+ assert(diskStore.contains("b1"))
+ assert(diskStore.contains("b2"))
+ assert(!diskStore.contains("b3"))
+ assert(diskStore.contains("b4"))
+ assert(memoryStore.currentUnrollMemoryForThisThread > 0) // we returned an iterator
+ }
+
+ test("multiple unrolls by the same thread") {
+ store = makeBlockManager(12000)
+ val memOnly = StorageLevel.MEMORY_ONLY
+ val memoryStore = store.memoryStore
+ val smallList = List.fill(40)(new Array[Byte](100))
+ def smallIterator = smallList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // All unroll memory used is released because unrollSafely returned an array
+ memoryStore.putIterator("b1", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+ memoryStore.putIterator("b2", smallIterator, memOnly, returnValues = true)
+ assert(memoryStore.currentUnrollMemoryForThisThread === 0)
+
+ // Unroll memory is not released because unrollSafely returned an iterator
+ // that still depends on the underlying vector used in the process
+ memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB3 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB3 > 0)
+
+ // The unroll memory owned by this thread builds on top of its value after the previous unrolls
+ memoryStore.putIterator("b4", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB4 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB4 > unrollMemoryAfterB3)
+
+ // ... but only to a certain extent (until we run out of free space to grant new unroll memory)
+ memoryStore.putIterator("b5", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB5 = memoryStore.currentUnrollMemoryForThisThread
+ memoryStore.putIterator("b6", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB6 = memoryStore.currentUnrollMemoryForThisThread
+ memoryStore.putIterator("b7", smallIterator, memOnly, returnValues = true)
+ val unrollMemoryAfterB7 = memoryStore.currentUnrollMemoryForThisThread
+ assert(unrollMemoryAfterB5 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
+ assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
deleted file mode 100644
index 93f0c6a8e6..0000000000
--- a/core/src/test/scala/org/apache/spark/util/SizeTrackingAppendOnlyMapSuite.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.util.Random
-
-import org.scalatest.{BeforeAndAfterAll, FunSuite}
-
-import org.apache.spark.util.SizeTrackingAppendOnlyMapSuite.LargeDummyClass
-import org.apache.spark.util.collection.{AppendOnlyMap, SizeTrackingAppendOnlyMap}
-
-class SizeTrackingAppendOnlyMapSuite extends FunSuite with BeforeAndAfterAll {
- val NORMAL_ERROR = 0.20
- val HIGH_ERROR = 0.30
-
- test("fixed size insertions") {
- testWith[Int, Long](10000, i => (i, i.toLong))
- testWith[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
- testWith[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass()))
- }
-
- test("variable size insertions") {
- val rand = new Random(123456789)
- def randString(minLen: Int, maxLen: Int): String = {
- "a" * (rand.nextInt(maxLen - minLen) + minLen)
- }
- testWith[Int, String](10000, i => (i, randString(0, 10)))
- testWith[Int, String](10000, i => (i, randString(0, 100)))
- testWith[Int, String](10000, i => (i, randString(90, 100)))
- }
-
- test("updates") {
- val rand = new Random(123456789)
- def randString(minLen: Int, maxLen: Int): String = {
- "a" * (rand.nextInt(maxLen - minLen) + minLen)
- }
- testWith[String, Int](10000, i => (randString(0, 10000), i))
- }
-
- def testWith[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
- val map = new SizeTrackingAppendOnlyMap[K, V]()
- for (i <- 0 until numElements) {
- val (k, v) = makeElement(i)
- map(k) = v
- expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
- }
- }
-
- def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
- val betterEstimatedSize = SizeEstimator.estimate(obj)
- assert(betterEstimatedSize * (1 - error) < estimatedSize,
- s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
- assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
- s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
- }
-}
-
-object SizeTrackingAppendOnlyMapSuite {
- // Speed test, for reproducibility of results.
- // These could be highly non-deterministic in general, however.
- // Results:
- // AppendOnlyMap: 31 ms
- // SizeTracker: 54 ms
- // SizeEstimator: 1500 ms
- def main(args: Array[String]) {
- val numElements = 100000
-
- val baseTimes = for (i <- 0 until 10) yield time {
- val map = new AppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- }
- }
-
- val sampledTimes = for (i <- 0 until 10) yield time {
- val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- map.estimateSize()
- }
- }
-
- val unsampledTimes = for (i <- 0 until 3) yield time {
- val map = new AppendOnlyMap[Int, LargeDummyClass]()
- for (i <- 0 until numElements) {
- map(i) = new LargeDummyClass()
- SizeEstimator.estimate(map)
- }
- }
-
- println("Base: " + baseTimes)
- println("SizeTracker (sampled): " + sampledTimes)
- println("SizeEstimator (unsampled): " + unsampledTimes)
- }
-
- def time(f: => Unit): Long = {
- val start = System.currentTimeMillis()
- f
- System.currentTimeMillis() - start
- }
-
- private class LargeDummyClass {
- val arr = new Array[Int](100)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
new file mode 100644
index 0000000000..1f33967249
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/SizeTrackerSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.reflect.ClassTag
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.util.SizeEstimator
+
+class SizeTrackerSuite extends FunSuite {
+ val NORMAL_ERROR = 0.20
+ val HIGH_ERROR = 0.30
+
+ import SizeTrackerSuite._
+
+ test("vector fixed size insertions") {
+ testVector[Long](10000, i => i.toLong)
+ testVector[(Long, Long)](10000, i => (i.toLong, i.toLong))
+ testVector[LargeDummyClass](10000, i => new LargeDummyClass)
+ }
+
+ test("vector variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testVector[String](10000, i => randString(0, 10))
+ testVector[String](10000, i => randString(0, 100))
+ testVector[String](10000, i => randString(90, 100))
+ }
+
+ test("map fixed size insertions") {
+ testMap[Int, Long](10000, i => (i, i.toLong))
+ testMap[Int, (Long, Long)](10000, i => (i, (i.toLong, i.toLong)))
+ testMap[Int, LargeDummyClass](10000, i => (i, new LargeDummyClass))
+ }
+
+ test("map variable size insertions") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testMap[Int, String](10000, i => (i, randString(0, 10)))
+ testMap[Int, String](10000, i => (i, randString(0, 100)))
+ testMap[Int, String](10000, i => (i, randString(90, 100)))
+ }
+
+ test("map updates") {
+ val rand = new Random(123456789)
+ def randString(minLen: Int, maxLen: Int): String = {
+ "a" * (rand.nextInt(maxLen - minLen) + minLen)
+ }
+ testMap[String, Int](10000, i => (randString(0, 10000), i))
+ }
+
+ def testVector[T: ClassTag](numElements: Int, makeElement: Int => T) {
+ val vector = new SizeTrackingVector[T]
+ for (i <- 0 until numElements) {
+ val item = makeElement(i)
+ vector += item
+ expectWithinError(vector, vector.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def testMap[K, V](numElements: Int, makeElement: (Int) => (K, V)) {
+ val map = new SizeTrackingAppendOnlyMap[K, V]
+ for (i <- 0 until numElements) {
+ val (k, v) = makeElement(i)
+ map(k) = v
+ expectWithinError(map, map.estimateSize(), if (i < 32) HIGH_ERROR else NORMAL_ERROR)
+ }
+ }
+
+ def expectWithinError(obj: AnyRef, estimatedSize: Long, error: Double) {
+ val betterEstimatedSize = SizeEstimator.estimate(obj)
+ assert(betterEstimatedSize * (1 - error) < estimatedSize,
+ s"Estimated size $estimatedSize was less than expected size $betterEstimatedSize")
+ assert(betterEstimatedSize * (1 + 2 * error) > estimatedSize,
+ s"Estimated size $estimatedSize was greater than expected size $betterEstimatedSize")
+ }
+}
+
+private object SizeTrackerSuite {
+
+ /**
+ * Run speed tests for size tracking collections.
+ */
+ def main(args: Array[String]): Unit = {
+ if (args.size < 1) {
+ println("Usage: SizeTrackerSuite [num elements]")
+ System.exit(1)
+ }
+ val numElements = args(0).toInt
+ vectorSpeedTest(numElements)
+ mapSpeedTest(numElements)
+ }
+
+ /**
+ * Speed test for SizeTrackingVector.
+ *
+ * Results for 100000 elements (possibly non-deterministic):
+ * PrimitiveVector 15 ms
+ * SizeTracker 51 ms
+ * SizeEstimator 2000 ms
+ */
+ def vectorSpeedTest(numElements: Int): Unit = {
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val vector = new PrimitiveVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ }
+ }
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val vector = new SizeTrackingVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ vector.estimateSize()
+ }
+ }
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val vector = new PrimitiveVector[LargeDummyClass]
+ for (i <- 0 until numElements) {
+ vector += new LargeDummyClass
+ SizeEstimator.estimate(vector)
+ }
+ }
+ printSpeedTestResult("SizeTrackingVector", baseTimes, sampledTimes, unsampledTimes)
+ }
+
+ /**
+ * Speed test for SizeTrackingAppendOnlyMap.
+ *
+ * Results for 100000 elements (possibly non-deterministic):
+ * AppendOnlyMap 30 ms
+ * SizeTracker 41 ms
+ * SizeEstimator 1666 ms
+ */
+ def mapSpeedTest(numElements: Int): Unit = {
+ val baseTimes = for (i <- 0 until 10) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ }
+ }
+ val sampledTimes = for (i <- 0 until 10) yield time {
+ val map = new SizeTrackingAppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ map.estimateSize()
+ }
+ }
+ val unsampledTimes = for (i <- 0 until 3) yield time {
+ val map = new AppendOnlyMap[Int, LargeDummyClass]
+ for (i <- 0 until numElements) {
+ map(i) = new LargeDummyClass
+ SizeEstimator.estimate(map)
+ }
+ }
+ printSpeedTestResult("SizeTrackingAppendOnlyMap", baseTimes, sampledTimes, unsampledTimes)
+ }
+
+ def printSpeedTestResult(
+ testName: String,
+ baseTimes: Seq[Long],
+ sampledTimes: Seq[Long],
+ unsampledTimes: Seq[Long]): Unit = {
+ println(s"Average times for $testName (ms):")
+ println(" Base - " + averageTime(baseTimes))
+ println(" SizeTracker (sampled) - " + averageTime(sampledTimes))
+ println(" SizeEstimator (unsampled) - " + averageTime(unsampledTimes))
+ println()
+ }
+
+ def time(f: => Unit): Long = {
+ val start = System.currentTimeMillis()
+ f
+ System.currentTimeMillis() - start
+ }
+
+ def averageTime(v: Seq[Long]): Long = {
+ v.sum / v.size
+ }
+
+ private class LargeDummyClass {
+ val arr = new Array[Int](100)
+ }
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 46e3dd914b..2e6c85cc2b 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -481,6 +481,15 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.storage.unrollFraction</code></td>
+ <td>0.2</td>
+ <td>
+ Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+ This is dynamically allocated by dropping existing blocks when there is not enough free
+ storage space to unroll the new block in its entirety.
+ </td>
+</tr>
+<tr>
<td><code>spark.tachyonStore.baseDir</code></td>
<td>System.getProperty("java.io.tmpdir")</td>
<td>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e9220db6b1..5ff88f0dd1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,7 +31,6 @@ import com.typesafe.tools.mima.core._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
-
def excludes(version: String) =
version match {
case v if v.startsWith("1.1") =>
@@ -63,6 +62,15 @@ object MimaExcludes {
"org.apache.spark.storage.MemoryStore.Entry")
) ++
Seq(
+ // Renamed putValues -> putArray + putIterator
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.MemoryStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.DiskStore.putValues"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.storage.TachyonStore.putValues")
+ ) ++
+ Seq(
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
) ++
Seq( // Ignore some private methods in ALS.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index ce8316bb14..d934b9cbfc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -110,8 +110,7 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
- blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
- storageLevel, tellMaster = true)
+ blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
@@ -124,7 +123,7 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
- blockManager.put(blockId, iterator, storageLevel, tellMaster = true)
+ blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, -1, optionalMetadata)
}