diff options
Diffstat (limited to 'core/src/main/scala/spark/storage/BlockManager.scala')
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 465 |
1 files changed, 212 insertions, 253 deletions
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index d3f6cd78dc..4bb4927b4a 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -2,10 +2,8 @@ package spark.storage import java.io.{InputStream, OutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} -import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} -import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} -import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet} import akka.actor.{ActorSystem, Cancellable, Props} import scala.concurrent.{Await, Future} @@ -16,7 +14,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream -import spark.{Logging, SizeEstimator, SparkEnv, SparkException, Utils} +import spark.{Logging, SparkEnv, SparkException, Utils} import spark.network._ import spark.serializer.Serializer import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap} @@ -24,30 +22,35 @@ import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStam import sun.nio.ch.DirectBuffer -private[spark] -case class BlockException(blockId: String, message: String, ex: Exception = null) -extends Exception(message) - -private[spark] -class BlockManager( +private[spark] class BlockManager( executorId: String, actorSystem: ActorSystem, val master: BlockManagerMaster, - val serializer: Serializer, + val defaultSerializer: Serializer, maxMemory: Long) extends Logging { - class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { - var pending: Boolean = true - var size: Long = -1L - var failed: Boolean = false + private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) { + @volatile var pending: Boolean = true + @volatile var size: Long = -1L + @volatile var initThread: Thread = null + @volatile var failed = false + + setInitThread() + + private def setInitThread() { + // Set current thread as init thread - waitForReady will not block this thread + // (in case there is non trivial initialization which ends up calling waitForReady as part of + // initialization itself) + this.initThread = Thread.currentThread() + } /** * Wait for this BlockInfo to be marked as ready (i.e. block is finished writing). * Return true if the block is available, false otherwise. */ def waitForReady(): Boolean = { - if (pending) { + if (initThread != Thread.currentThread() && pending) { synchronized { while (pending) this.wait() } @@ -57,35 +60,51 @@ class BlockManager( /** Mark this BlockInfo as ready (i.e. block is finished writing) */ def markReady(sizeInBytes: Long) { + assert (pending) + size = sizeInBytes + initThread = null + failed = false + initThread = null + pending = false synchronized { - pending = false - failed = false - size = sizeInBytes this.notifyAll() } } /** Mark this BlockInfo as ready but failed */ def markFailure() { + assert (pending) + size = 0 + initThread = null + failed = true + initThread = null + pending = false synchronized { - failed = true - pending = false this.notifyAll() } } } + val shuffleBlockManager = new ShuffleBlockManager(this) + private val blockInfo = new TimeStampedHashMap[String, BlockInfo] private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) - private[storage] val diskStore: BlockStore = + private[storage] val diskStore: DiskStore = new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) + // If we use Netty for shuffle, start a new Netty-based shuffle sender service. + private val nettyPort: Int = { + val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean + val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt + if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0 + } + val connectionManager = new ConnectionManager(0) implicit val futureExecContext = connectionManager.futureExecContext val blockManagerId = BlockManagerId( - executorId, connectionManager.id.host, connectionManager.id.port) + executorId, connectionManager.id.host, connectionManager.id.port, nettyPort) // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) @@ -101,7 +120,7 @@ class BlockManager( val heartBeatFrequency = BlockManager.getHeartBeatFrequencyFromSystemProperties - val host = System.getProperty("spark.hostname", Utils.localHostName()) + val hostPort = Utils.localHostPort() val slaveActor = actorSystem.actorOf(Props(new BlockManagerSlaveActor(this)), name = "BlockManagerActor" + BlockManager.ID_GENERATOR.next) @@ -212,9 +231,12 @@ class BlockManager( * Tell the master about the current storage status of a block. This will send a block update * message reflecting the current status, *not* the desired storage level in its block info. * For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. + * + * droppedMemorySize exists to account for when block is dropped from memory to disk (so it is still valid). + * This ensures that update in master will compensate for the increase in memory on slave. */ - def reportBlockStatus(blockId: String, info: BlockInfo) { - val needReregister = !tryToReportBlockStatus(blockId, info) + def reportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L) { + val needReregister = !tryToReportBlockStatus(blockId, info, droppedMemorySize) if (needReregister) { logInfo("Got told to reregister updating block " + blockId) // Reregistering will report our new block for free. @@ -228,7 +250,7 @@ class BlockManager( * which will be true if the block was successfully recorded and false if * the slave needs to re-register. */ - private def tryToReportBlockStatus(blockId: String, info: BlockInfo): Boolean = { + private def tryToReportBlockStatus(blockId: String, info: BlockInfo, droppedMemorySize: Long = 0L): Boolean = { val (curLevel, inMemSize, onDiskSize, tellMaster) = info.synchronized { info.level match { case null => @@ -237,7 +259,7 @@ class BlockManager( val inMem = level.useMemory && memoryStore.contains(blockId) val onDisk = level.useDisk && diskStore.contains(blockId) val storageLevel = StorageLevel(onDisk, inMem, level.deserialized, level.replication) - val memSize = if (inMem) memoryStore.getSize(blockId) else 0L + val memSize = if (inMem) memoryStore.getSize(blockId) else droppedMemorySize val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L (storageLevel, memSize, diskSize, info.tellMaster) } @@ -250,26 +272,24 @@ class BlockManager( } } - /** - * Get locations of the block. + * Get locations of an array of blocks. */ - def getLocations(blockId: String): Seq[String] = { + def getLocationBlockIds(blockIds: Array[String]): Array[Seq[BlockManagerId]] = { val startTimeMs = System.currentTimeMillis - var managers = master.getLocations(blockId) - val locations = managers.map(_.ip) - logDebug("Got block locations in " + Utils.getUsedTimeMs(startTimeMs)) - return locations + val locations = master.getLocations(blockIds).toArray + logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) + locations } /** - * Get locations of an array of blocks. + * A short-circuited method to get blocks directly from disk. This is used for getting + * shuffle blocks. It is safe to do so without a lock on block info since disk store + * never deletes (recent) items. */ - def getLocations(blockIds: Array[String]): Array[Seq[String]] = { - val startTimeMs = System.currentTimeMillis - val locations = master.getLocations(blockIds).map(_.map(_.ip).toSeq).toArray - logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs)) - return locations + def getLocalFromDisk(blockId: String, serializer: Serializer): Option[Iterator[Any]] = { + diskStore.getValues(blockId, serializer).orElse( + sys.error("Block " + blockId + " not found on disk, though it should be")) } /** @@ -277,18 +297,6 @@ class BlockManager( */ def getLocal(blockId: String): Option[Iterator[Any]] = { logDebug("Getting local block " + blockId) - - // As an optimization for map output fetches, if the block is for a shuffle, return it - // without acquiring a lock; the disk store never deletes (recent) items so this should work - if (blockId.startsWith("shuffle_")) { - return diskStore.getValues(blockId) match { - case Some(iterator) => - Some(iterator) - case None => - throw new Exception("Block " + blockId + " not found on disk, though it should be") - } - } - val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { @@ -339,6 +347,8 @@ class BlockManager( case Some(bytes) => // Put a copy of the block back in memory before returning it. Note that we can't // put the ByteBuffer returned by the disk store as that's a memory-mapped file. + // The use of rewind assumes this. + assert (0 == bytes.position()) val copyForMemory = ByteBuffer.allocate(bytes.limit) copyForMemory.put(bytes) memoryStore.putBytes(blockId, copyForMemory, level) @@ -372,7 +382,7 @@ class BlockManager( // As an optimization for map output fetches, if the block is for a shuffle, return it // without acquiring a lock; the disk store never deletes (recent) items so this should work - if (blockId.startsWith("shuffle_")) { + if (ShuffleBlockManager.isShuffle(blockId)) { return diskStore.getBytes(blockId) match { case Some(bytes) => Some(bytes) @@ -411,6 +421,7 @@ class BlockManager( // Read it as a byte buffer into memory first, then return it diskStore.getBytes(blockId) match { case Some(bytes) => + assert (0 == bytes.position()) if (level.useMemory) { if (level.deserialized) { memoryStore.putBytes(blockId, bytes, level) @@ -450,7 +461,7 @@ class BlockManager( for (loc <- locations) { logDebug("Getting remote block " + blockId + " from " + loc) val data = BlockManagerWorker.syncGetBlock( - GetBlock(blockId), ConnectionManagerId(loc.ip, loc.port)) + GetBlock(blockId), ConnectionManagerId(loc.host, loc.port)) if (data != null) { return Some(dataDeserialize(blockId, data)) } @@ -473,9 +484,19 @@ class BlockManager( * fashion as they're received. Expects a size in bytes to be provided for each block fetched, * so that we can control the maxMegabytesInFlight for the fetch. */ - def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])]) + def getMultiple( + blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])], serializer: Serializer) : BlockFetcherIterator = { - return new BlockFetcherIterator(this, blocksByAddress) + + val iter = + if (System.getProperty("spark.shuffle.use.netty", "false").toBoolean) { + new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) + } else { + new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) + } + + iter.initialize() + iter } def put(blockId: String, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean) @@ -486,6 +507,22 @@ class BlockManager( } /** + * A short circuited method to get a block writer that can write data directly to disk. + * This is currently used for writing shuffle files out. Callers should handle error + * cases. + */ + def getDiskBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int) + : BlockObjectWriter = { + val writer = diskStore.getBlockWriter(blockId, serializer, bufferSize) + writer.registerCloseEventHandler(() => { + val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false) + blockInfo.put(blockId, myInfo) + myInfo.markReady(writer.size()) + }) + writer + } + + /** * Put a new block of values to the block manager. Returns its (estimated) size in bytes. */ def put(blockId: String, values: ArrayBuffer[Any], level: StorageLevel, @@ -501,17 +538,26 @@ class BlockManager( throw new IllegalArgumentException("Storage level is null or invalid") } - val oldBlock = blockInfo.get(blockId).orNull - if (oldBlock != null && oldBlock.waitForReady()) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return oldBlock.size - } - // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - blockInfo.put(blockId, myInfo) + val myInfo = { + val tinfo = new BlockInfo(level, tellMaster) + // Do atomically ! + val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + + if (oldBlockOpt.isDefined) { + if (oldBlockOpt.get.waitForReady()) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return oldBlockOpt.get.size + } + + // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + oldBlockOpt.get + } else { + tinfo + } + } val startTimeMs = System.currentTimeMillis @@ -531,6 +577,7 @@ class BlockManager( logTrace("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") + var marked = false try { if (level.useMemory) { // Save it just to memory first, even if it also has useDisk set to true; we will later @@ -555,26 +602,25 @@ class BlockManager( // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. + marked = true myInfo.markReady(size) if (tellMaster) { reportBlockStatus(blockId, myInfo) } - } catch { + } finally { // If we failed at putting the block to memory/disk, notify other possible readers // that it has failed, and then remove it from the block info map. - case e: Exception => { + if (! marked) { // Note that the remove must happen before markFailure otherwise another thread // could've inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) myInfo.markFailure() - logWarning("Putting block " + blockId + " failed", e) - throw e + logWarning("Putting block " + blockId + " failed") } } } logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) - // Replicate block if required if (level.replication > 1) { val remoteStartTime = System.currentTimeMillis @@ -611,16 +657,26 @@ class BlockManager( throw new IllegalArgumentException("Storage level is null or invalid") } - if (blockInfo.contains(blockId)) { - logWarning("Block " + blockId + " already exists on this machine; not re-adding it") - return - } - // Remember the block's storage level so that we can correctly drop it to disk if it needs // to be dropped right after it got put into memory. Note, however, that other threads will // not be able to get() this block until we call markReady on its BlockInfo. - val myInfo = new BlockInfo(level, tellMaster) - blockInfo.put(blockId, myInfo) + val myInfo = { + val tinfo = new BlockInfo(level, tellMaster) + // Do atomically ! + val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + + if (oldBlockOpt.isDefined) { + if (oldBlockOpt.get.waitForReady()) { + logWarning("Block " + blockId + " already exists on this machine; not re-adding it") + return + } + + // TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ? + oldBlockOpt.get + } else { + tinfo + } + } val startTimeMs = System.currentTimeMillis @@ -639,6 +695,7 @@ class BlockManager( logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) + " to get into synchronized block") + var marked = false try { if (level.useMemory) { // Store it only in memory at first, even if useDisk is also set to true @@ -649,22 +706,24 @@ class BlockManager( diskStore.putBytes(blockId, bytes, level) } + // assert (0 == bytes.position(), "" + bytes) + // Now that the block is in either the memory or disk store, let other threads read it, // and tell the master about it. + marked = true myInfo.markReady(bytes.limit) if (tellMaster) { reportBlockStatus(blockId, myInfo) } - } catch { + } finally { // If we failed at putting the block to memory/disk, notify other possible readers // that it has failed, and then remove it from the block info map. - case e: Exception => { + if (! marked) { // Note that the remove must happen before markFailure otherwise another thread // could've inserted a new BlockInfo before we remove it. blockInfo.remove(blockId) myInfo.markFailure() - logWarning("Putting block " + blockId + " failed", e) - throw e + logWarning("Putting block " + blockId + " failed") } } } @@ -698,7 +757,7 @@ class BlockManager( logDebug("Try to replicate BlockId " + blockId + " once; The size of the data is " + data.limit() + " Bytes. To node: " + peer) if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel), - new ConnectionManagerId(peer.ip, peer.port))) { + new ConnectionManagerId(peer.host, peer.port))) { logError("Failed to call syncPutBlock to " + peer) } logDebug("Replicated BlockId " + blockId + " once used " + @@ -730,6 +789,14 @@ class BlockManager( val info = blockInfo.get(blockId).orNull if (info != null) { info.synchronized { + // required ? As of now, this will be invoked only for blocks which are ready + // But in case this changes in future, adding for consistency sake. + if (! info.waitForReady() ) { + // If we get here, the block write failed. + logWarning("Block " + blockId + " was marked as failure. Nothing to drop") + return + } + val level = info.level if (level.useDisk && !diskStore.contains(blockId)) { logInfo("Writing block " + blockId + " to disk") @@ -740,12 +807,13 @@ class BlockManager( diskStore.putBytes(blockId, bytes, level) } } + val droppedMemorySize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val blockWasRemoved = memoryStore.remove(blockId) if (!blockWasRemoved) { logWarning("Block " + blockId + " could not be dropped from memory as it does not exist") } if (info.tellMaster) { - reportBlockStatus(blockId, info) + reportBlockStatus(blockId, info, droppedMemorySize) } if (!level.useDisk) { // The block is completely gone from this node; forget it so we can put() it again later. @@ -758,9 +826,23 @@ class BlockManager( } /** + * Remove all blocks belonging to the given RDD. + * @return The number of blocks removed. + */ + def removeRdd(rddId: Int): Int = { + // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps + // from RDD.id to blocks. + logInfo("Removing RDD " + rddId) + val rddPrefix = "rdd_" + rddId + "_" + val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1) + blocksToRemove.foreach(blockId => removeBlock(blockId, false)) + blocksToRemove.size + } + + /** * Remove a block from both memory and disk. */ - def removeBlock(blockId: String) { + def removeBlock(blockId: String, tellMaster: Boolean = true) { logInfo("Removing block " + blockId) val info = blockInfo.get(blockId).orNull if (info != null) info.synchronized { @@ -772,7 +854,7 @@ class BlockManager( "the disk or memory store") } blockInfo.remove(blockId) - if (info.tellMaster) { + if (tellMaster && info.tellMaster) { reportBlockStatus(blockId, info) } } else { @@ -805,7 +887,7 @@ class BlockManager( } def shouldCompress(blockId: String): Boolean = { - if (blockId.startsWith("shuffle_")) { + if (ShuffleBlockManager.isShuffle(blockId)) { compressShuffle } else if (blockId.startsWith("broadcast_")) { compressBroadcast @@ -820,7 +902,11 @@ class BlockManager( * Wrap an output stream for compression if block compression is enabled for its block type */ def wrapForCompression(blockId: String, s: OutputStream): OutputStream = { - if (shouldCompress(blockId)) new LZFOutputStream(s) else s + if (shouldCompress(blockId)) { + (new LZFOutputStream(s)).setFinishBlockOnFlush(true) + } else { + s + } } /** @@ -830,7 +916,10 @@ class BlockManager( if (shouldCompress(blockId)) new LZFInputStream(s) else s } - def dataSerialize(blockId: String, values: Iterator[Any]): ByteBuffer = { + def dataSerialize( + blockId: String, + values: Iterator[Any], + serializer: Serializer = defaultSerializer): ByteBuffer = { val byteStream = new FastByteArrayOutputStream(4096) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() @@ -842,7 +931,10 @@ class BlockManager( * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of * the iterator is reached. */ - def dataDeserialize(blockId: String, bytes: ByteBuffer): Iterator[Any] = { + def dataDeserialize( + blockId: String, + bytes: ByteBuffer, + serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) serializer.newInstance().deserializeStream(stream).asIterator @@ -862,8 +954,8 @@ class BlockManager( } } -private[spark] -object BlockManager extends Logging { + +private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator @@ -873,7 +965,8 @@ object BlockManager extends Logging { } def getHeartBeatFrequencyFromSystemProperties: Long = - System.getProperty("spark.storage.blockManagerHeartBeatMs", "10000").toLong + + System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 def getDisableHeartBeatsForTesting: Boolean = System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean @@ -892,177 +985,43 @@ object BlockManager extends Logging { } } } -} - -class BlockFetcherIterator( - private val blockManager: BlockManager, - val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] -) extends Iterator[(String, Option[Iterator[Any]])] with Logging with BlockFetchTracker { - import blockManager._ - - private var _remoteBytesRead = 0l - private var _remoteFetchTime = 0l - private var _fetchWaitTime = 0l - - if (blocksByAddress == null) { - throw new IllegalArgumentException("BlocksByAddress is null") - } - val totalBlocks = blocksByAddress.map(_._2.size).sum - logDebug("Getting " + totalBlocks + " blocks") - var startTime = System.currentTimeMillis - val localBlockIds = new ArrayBuffer[String]() - val remoteBlockIds = new HashSet[String]() - - // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize - // the block (since we want all deserializaton to happen in the calling thread); can also - // represent a fetch failure if size == -1. - class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) { - def failed: Boolean = size == -1 - } - - // A queue to hold our results. - val results = new LinkedBlockingQueue[FetchResult] - - // A request to fetch one or more blocks, complete with their sizes - class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) { - val size = blocks.map(_._2).sum - } + def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = { + // env == null and blockManagerMaster != null is used in tests + assert (env != null || blockManagerMaster != null) + val locationBlockIds: Seq[Seq[BlockManagerId]] = + if (env != null) { + env.blockManager.getLocationBlockIds(blockIds) + } else { + blockManagerMaster.getLocations(blockIds) + } - // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that - // the number of bytes in flight is limited to maxBytesInFlight - val fetchRequests = new Queue[FetchRequest] + // Convert from block master locations to executor locations (we need that for task scheduling) + val executorLocations = new HashMap[String, List[String]]() + for (i <- 0 until blockIds.length) { + val blockId = blockIds(i) + val blockLocations = locationBlockIds(i) - // Current bytes in flight from our requests - var bytesInFlight = 0L + val executors = new HashSet[String]() - def sendRequest(req: FetchRequest) { - logDebug("Sending request for %d blocks (%s) from %s".format( - req.blocks.size, Utils.memoryBytesToString(req.size), req.address.ip)) - val cmId = new ConnectionManagerId(req.address.ip, req.address.port) - val blockMessageArray = new BlockMessageArray(req.blocks.map { - case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId)) - }) - bytesInFlight += req.size - val sizeMap = req.blocks.toMap // so we can look up the size of each blockID - val fetchStart = System.currentTimeMillis() - val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) - future.onSuccess { - case Some(message) => { - val fetchDone = System.currentTimeMillis() - _remoteFetchTime += fetchDone - fetchStart - val bufferMessage = message.asInstanceOf[BufferMessage] - val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - for (blockMessage <- blockMessageArray) { - if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { - throw new SparkException( - "Unexpected message " + blockMessage.getType + " received from " + cmId) - } - val blockId = blockMessage.getId - results.put(new FetchResult( - blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData))) - _remoteBytesRead += req.size - logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + if (env != null) { + for (bkLocation <- blockLocations) { + val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host) + executors += executorHostPort + // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) } - } - case None => { - logError("Could not get block(s) from " + cmId) - for ((blockId, size) <- req.blocks) { - results.put(new FetchResult(blockId, -1, null)) + } else { + // Typically while testing, etc - revert to simply using host. + for (bkLocation <- blockLocations) { + executors += bkLocation.host + // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort) } } - } - } - // Split local and remote blocks. Remote blocks are further split into FetchRequests of size - // at most maxBytesInFlight in order to limit the amount of data in flight. - val remoteRequests = new ArrayBuffer[FetchRequest] - for ((address, blockInfos) <- blocksByAddress) { - if (address == blockManagerId) { - localBlockIds ++= blockInfos.map(_._1) - } else { - remoteBlockIds ++= blockInfos.map(_._1) - // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them - // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5 - // nodes, rather than blocking on reading output from one node. - val minRequestSize = math.max(maxBytesInFlight / 5, 1L) - logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize) - val iterator = blockInfos.iterator - var curRequestSize = 0L - var curBlocks = new ArrayBuffer[(String, Long)] - while (iterator.hasNext) { - val (blockId, size) = iterator.next() - curBlocks += ((blockId, size)) - curRequestSize += size - if (curRequestSize >= minRequestSize) { - // Add this FetchRequest - remoteRequests += new FetchRequest(address, curBlocks) - curRequestSize = 0 - curBlocks = new ArrayBuffer[(String, Long)] - } - } - // Add in the final request - if (!curBlocks.isEmpty) { - remoteRequests += new FetchRequest(address, curBlocks) - } + executorLocations.put(blockId, executors.toSeq.toList) } - } - // Add the remote requests into our queue in a random order - fetchRequests ++= Utils.randomize(remoteRequests) - // Send out initial requests for blocks, up to our maxBytesInFlight - while (!fetchRequests.isEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) + executorLocations } - val numGets = remoteBlockIds.size - fetchRequests.size - logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime)) - - // Get the local blocks while remote blocks are being fetched. Note that it's okay to do - // these all at once because they will just memory-map some files, so they won't consume - // any memory that might exceed our maxBytesInFlight - startTime = System.currentTimeMillis - for (id <- localBlockIds) { - getLocal(id) match { - case Some(iter) => { - results.put(new FetchResult(id, 0, () => iter)) // Pass 0 as size since it's not in flight - logDebug("Got local block " + id) - } - case None => { - throw new BlockException(id, "Could not get block " + id + " from local machine") - } - } - } - logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") - - //an iterator that will read fetched blocks off the queue as they arrive. - var resultsGotten = 0 - - def hasNext: Boolean = resultsGotten < totalBlocks - - def next(): (String, Option[Iterator[Any]]) = { - resultsGotten += 1 - val startFetchWait = System.currentTimeMillis() - val result = results.take() - val stopFetchWait = System.currentTimeMillis() - _fetchWaitTime += (stopFetchWait - startFetchWait) - bytesInFlight -= result.size - while (!fetchRequests.isEmpty && - (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { - sendRequest(fetchRequests.dequeue()) - } - (result.blockId, if (result.failed) None else Some(result.deserialize())) - } - - - //methods to profile the block fetching - def numLocalBlocks = localBlockIds.size - def numRemoteBlocks = remoteBlockIds.size - - def remoteFetchTime = _remoteFetchTime - def fetchWaitTime = _fetchWaitTime - - def remoteBytesRead = _remoteBytesRead - } |