diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-06 18:46:04 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-06 18:46:04 -0700 |
commit | 9a3b3f32a3ccb849293180a899377e8468f7544a (patch) | |
tree | ce09892ad1b77fe1e5e3ec2bdb385b7b49cbe204 /core | |
parent | 0e42832e6ae8d1e343d9b153af0a787fe8507602 (diff) | |
download | spark-9a3b3f32a3ccb849293180a899377e8468f7544a.tar.gz spark-9a3b3f32a3ccb849293180a899377e8468f7544a.tar.bz2 spark-9a3b3f32a3ccb849293180a899377e8468f7544a.zip |
Pass sizes of map outputs back to MapOutputTracker
Diffstat (limited to 'core')
9 files changed, 152 insertions, 83 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index fb65ba421a..048d1788c2 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -17,12 +17,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val blockManager = SparkEnv.get.blockManager val startTime = System.currentTimeMillis - val addresses = SparkEnv.get.mapOutputTracker.getServerAddresses(shuffleId) + val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( shuffleId, reduceId, System.currentTimeMillis - startTime)) val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[Int]] - for ((address, index) <- addresses.zipWithIndex) { + for (((address, size), index) <- statuses.zipWithIndex) { splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += index } @@ -44,7 +44,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r blockId match { case regex(shufId, mapId, reduceId) => - val addr = addresses(mapId.toInt) + val addr = statuses(mapId.toInt)._1 throw new FetchFailedException(addr, shufId.toInt, mapId.toInt, reduceId.toInt, null) case _ => throw new SparkException( diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 116d526854..1b4b5ed240 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -1,6 +1,6 @@ package spark -import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream} +import java.io._ import java.util.concurrent.ConcurrentHashMap import akka.actor._ @@ -14,7 +14,9 @@ import akka.util.duration._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +import scheduler.MapStatus import spark.storage.BlockManagerId +import java.util.zip.{GZIPInputStream, GZIPOutputStream} private[spark] sealed trait MapOutputTrackerMessage private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage @@ -40,16 +42,16 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea val timeout = 10.seconds - var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] + var mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] // Incremented every time a fetch fails so that client nodes know to clear // their cache of map output locations if this happens. private var generation: Long = 0 - private var generationLock = new java.lang.Object + private val generationLock = new java.lang.Object - // Cache a serialized version of the output locations for each shuffle to send them out faster + // Cache a serialized version of the output statuses for each shuffle to send them out faster var cacheGeneration = generation - val cachedSerializedLocs = new HashMap[Int, Array[Byte]] + val cachedSerializedStatuses = new HashMap[Int, Array[Byte]] var trackerActor: ActorRef = if (isMaster) { val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName) @@ -80,31 +82,34 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } def registerShuffle(shuffleId: Int, numMaps: Int) { - if (bmAddresses.get(shuffleId) != null) { + if (mapStatuses.get(shuffleId) != null) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } - bmAddresses.put(shuffleId, new Array[BlockManagerId](numMaps)) + mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)) } - def registerMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { - var array = bmAddresses.get(shuffleId) + def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { + var array = mapStatuses.get(shuffleId) array.synchronized { - array(mapId) = bmAddress + array(mapId) = status } } - def registerMapOutputs(shuffleId: Int, locs: Array[BlockManagerId], changeGeneration: Boolean = false) { - bmAddresses.put(shuffleId, Array[BlockManagerId]() ++ locs) + def registerMapOutputs( + shuffleId: Int, + statuses: Array[MapStatus], + changeGeneration: Boolean = false) { + mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses) if (changeGeneration) { incrementGeneration() } } def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) { - var array = bmAddresses.get(shuffleId) + var array = mapStatuses.get(shuffleId) if (array != null) { array.synchronized { - if (array(mapId) == bmAddress) { + if (array(mapId).address == bmAddress) { array(mapId) = null } } @@ -117,10 +122,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // Remembers which map output locations are currently being fetched on a worker val fetching = new HashSet[Int] - // Called on possibly remote nodes to get the server URIs for a given shuffle - def getServerAddresses(shuffleId: Int): Array[BlockManagerId] = { - val locs = bmAddresses.get(shuffleId) - if (locs == null) { + // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle + def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { + val statuses = mapStatuses.get(shuffleId) + if (statuses == null) { logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") fetching.synchronized { if (fetching.contains(shuffleId)) { @@ -129,10 +134,11 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea try { fetching.wait() } catch { - case _ => + case e: InterruptedException => } } - return bmAddresses.get(shuffleId) + return mapStatuses.get(shuffleId).map(status => + (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId)))) } else { fetching += shuffleId } @@ -140,23 +146,25 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]] - val fetchedLocs = deserializeLocations(fetchedBytes) + val fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") - bmAddresses.put(shuffleId, fetchedLocs) + mapStatuses.put(shuffleId, fetchedStatuses) fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } - return fetchedLocs + return fetchedStatuses.map(s => + (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) } else { - return locs + return statuses.map(s => + (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId)))) } } def stop() { communicate(StopMapOutputTracker) - bmAddresses.clear() + mapStatuses.clear() trackerActor = null } @@ -182,75 +190,82 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea generationLock.synchronized { if (newGen > generation) { logInfo("Updating generation to " + newGen + " and clearing cache") - bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]] + mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]] generation = newGen } } } def getSerializedLocations(shuffleId: Int): Array[Byte] = { - var locs: Array[BlockManagerId] = null + var statuses: Array[MapStatus] = null var generationGotten: Long = -1 generationLock.synchronized { if (generation > cacheGeneration) { - cachedSerializedLocs.clear() + cachedSerializedStatuses.clear() cacheGeneration = generation } - cachedSerializedLocs.get(shuffleId) match { + cachedSerializedStatuses.get(shuffleId) match { case Some(bytes) => return bytes case None => - locs = bmAddresses.get(shuffleId) + statuses = mapStatuses.get(shuffleId) generationGotten = generation } } // If we got here, we failed to find the serialized locations in the cache, so we pulled // out a snapshot of the locations as "locs"; let's serialize and return that - val bytes = serializeLocations(locs) + val bytes = serializeStatuses(statuses) // Add them into the table only if the generation hasn't changed while we were working generationLock.synchronized { if (generation == generationGotten) { - cachedSerializedLocs(shuffleId) = bytes + cachedSerializedStatuses(shuffleId) = bytes } } return bytes } // Serialize an array of map output locations into an efficient byte format so that we can send - // it to reduce tasks. We do this by grouping together the locations by block manager ID. - def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = { + // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will + // generally be pretty compressible because many map outputs will be on the same hostname. + def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = { val out = new ByteArrayOutputStream - val dataOut = new DataOutputStream(out) - dataOut.writeInt(locs.length) - val grouped = locs.zipWithIndex.groupBy(_._1) - dataOut.writeInt(grouped.size) - for ((id, pairs) <- grouped if id != null) { - dataOut.writeUTF(id.ip) - dataOut.writeInt(id.port) - dataOut.writeInt(pairs.length) - for ((_, blockIndex) <- pairs) { - dataOut.writeInt(blockIndex) - } - } - dataOut.close() + val objOut = new ObjectOutputStream(new GZIPOutputStream(out)) + objOut.writeObject(statuses) + objOut.close() out.toByteArray } - // Opposite of serializeLocations. - def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = { - val dataIn = new DataInputStream(new ByteArrayInputStream(bytes)) - val length = dataIn.readInt() - val array = new Array[BlockManagerId](length) - val numGroups = dataIn.readInt() - for (i <- 0 until numGroups) { - val ip = dataIn.readUTF() - val port = dataIn.readInt() - val id = new BlockManagerId(ip, port) - val numBlocks = dataIn.readInt() - for (j <- 0 until numBlocks) { - array(dataIn.readInt()) = id - } + // Opposite of serializeStatuses. + def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = { + val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes))) + objIn.readObject().asInstanceOf[Array[MapStatus]] + } +} + +private[spark] object MapOutputTracker { + private val LOG_BASE = 1.1 + + /** + * Compress a size in bytes to 8 bits for efficient reporting of map output sizes. + * We do this by encoding the log base 1.1 of the size as an integer, which can support + * sizes up to 35 GB with at most 10% error. + */ + def compressSize(size: Long): Byte = { + if (size <= 1L) { + 0 + } else { + math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte + } + } + + /** + * Decompress an 8-bit encoded block size, using the reverse operation of compressSize. + */ + def decompressSize(compressedSize: Byte): Long = { + if (compressedSize == 0) { + 1 + } else { + math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong } - array } } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 9b666ed181..6f4c6bffd7 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -422,11 +422,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with case smt: ShuffleMapTask => val stage = idToStage(smt.stageId) - val bmAddress = event.result.asInstanceOf[BlockManagerId] - val host = bmAddress.ip + val status = event.result.asInstanceOf[MapStatus] + val host = status.address.ip logInfo("ShuffleMapTask finished with host " + host) if (!deadHosts.contains(host)) { // TODO: Make sure hostnames are consistent with Mesos - stage.addOutputLoc(smt.partition, bmAddress) + stage.addOutputLoc(smt.partition, status) } if (running.contains(stage) && pendingTasks(stage).isEmpty) { logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages") diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala new file mode 100644 index 0000000000..4532d9497f --- /dev/null +++ b/core/src/main/scala/spark/scheduler/MapStatus.scala @@ -0,0 +1,27 @@ +package spark.scheduler + +import spark.storage.BlockManagerId +import java.io.{ObjectOutput, ObjectInput, Externalizable} + +/** + * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the + * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. + * The map output sizes are compressed using MapOutputTracker.compressSize. + */ +private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte]) + extends Externalizable { + + def this() = this(null, null) // For deserialization only + + def writeExternal(out: ObjectOutput) { + address.writeExternal(out) + out.writeInt(compressedSizes.length) + out.write(compressedSizes) + } + + def readExternal(in: ObjectInput) { + address = new BlockManagerId(in) + compressedSizes = new Array[Byte](in.readInt()) + in.readFully(compressedSizes) + } +} diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 9ca2c9e449..3e5ba10fd9 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -74,7 +74,7 @@ private[spark] class ShuffleMapTask( var dep: ShuffleDependency[_,_,_], var partition: Int, @transient var locs: Seq[String]) - extends Task[BlockManagerId](stageId) + extends Task[MapStatus](stageId) with Externalizable with Logging { @@ -109,7 +109,7 @@ private[spark] class ShuffleMapTask( split = in.readObject().asInstanceOf[Split] } - override def run(attemptId: Long): BlockManagerId = { + override def run(attemptId: Long): MapStatus = { val numOutputSplits = dep.partitioner.numPartitions val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]] val partitioner = dep.partitioner @@ -141,15 +141,18 @@ private[spark] class ShuffleMapTask( buckets.map(_.iterator) } + val compressedSizes = new Array[Byte](numOutputSplits) + val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i // Get a Scala iterator from Java map val iter: Iterator[(Any, Any)] = bucketIterators(i) - blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) + val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false) + compressedSizes(i) = MapOutputTracker.compressSize(size) } - return SparkEnv.get.blockManager.blockManagerId + return new MapStatus(blockManager.blockManagerId, compressedSizes) } override def preferredLocations: Seq[String] = locs diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 803dd1b97d..1149c00a23 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -29,29 +29,29 @@ private[spark] class Stage( val isShuffleMap = shuffleDep != None val numPartitions = rdd.splits.size - val outputLocs = Array.fill[List[BlockManagerId]](numPartitions)(Nil) + val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) var numAvailableOutputs = 0 private var nextAttemptId = 0 def isAvailable: Boolean = { - if (/*parents.size == 0 &&*/ !isShuffleMap) { + if (!isShuffleMap) { true } else { numAvailableOutputs == numPartitions } } - def addOutputLoc(partition: Int, bmAddress: BlockManagerId) { + def addOutputLoc(partition: Int, status: MapStatus) { val prevList = outputLocs(partition) - outputLocs(partition) = bmAddress :: prevList + outputLocs(partition) = status :: prevList if (prevList == Nil) numAvailableOutputs += 1 } def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_ == bmAddress) + val newList = prevList.filterNot(_.address == bmAddress) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { numAvailableOutputs -= 1 @@ -62,7 +62,7 @@ private[spark] class Stage( var becameUnavailable = false for (partition <- 0 until numPartitions) { val prevList = outputLocs(partition) - val newList = prevList.filterNot(_.ip == host) + val newList = prevList.filterNot(_.address.ip == host) outputLocs(partition) = newList if (prevList != Nil && newList == Nil) { becameUnavailable = true diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 31debdd0fb..c4b241bf5a 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -20,7 +20,9 @@ import sun.nio.ch.DirectBuffer private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable { - def this() = this(null, 0) + def this() = this(null, 0) // For deserialization only + + def this(in: ObjectInput) = this(in.readUTF(), in.readInt()) override def writeExternal(out: ObjectOutput) { out.writeUTF(ip) diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala index 26f4ddfb49..3e6f09257a 100644 --- a/core/src/main/scala/spark/storage/DiskStore.scala +++ b/core/src/main/scala/spark/storage/DiskStore.scala @@ -58,19 +58,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) val objOut = blockManager.serializer.newInstance().serializeStream(fileOut) objOut.writeAll(values) objOut.close() - val finishTime = System.currentTimeMillis + val length = file.length() logDebug("Block %s stored as %s file on disk in %d ms".format( - blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime))) + blockId, Utils.memoryBytesToString(length), (System.currentTimeMillis - startTime))) if (returnValues) { // Return a byte buffer for the contents of the file val channel = new RandomAccessFile(file, "r").getChannel() - val length = channel.size() val buffer = channel.map(MapMode.READ_ONLY, 0, length) channel.close() PutResult(length, Right(buffer)) } else { - null + PutResult(length, null) } } diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala new file mode 100644 index 0000000000..d28b06c013 --- /dev/null +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -0,0 +1,23 @@ +package spark + +import org.scalatest.FunSuite + +class MapOutputTrackerSuite extends FunSuite { + test("compressSize") { + assert(MapOutputTracker.compressSize(0L) === 0) + assert(MapOutputTracker.compressSize(1L) === 0) + assert(MapOutputTracker.compressSize(2L) === 8) + assert(MapOutputTracker.compressSize(10L) === 25) + assert((MapOutputTracker.compressSize(1000000L) & 0xFF) === 145) + assert((MapOutputTracker.compressSize(1000000000L) & 0xFF) === 218) + } + + test("decompressSize") { + assert(MapOutputTracker.decompressSize(0) === 1) + for (size <- Seq(2L, 10L, 100L, 50000L, 1000000L, 1000000000L)) { + val size2 = MapOutputTracker.decompressSize(MapOutputTracker.compressSize(size)) + assert(size2 >= 0.99 * size && size2 <= 1.11 * size, + "size " + size + " decompressed to " + size2 + ", which is out of range") + } + } +} |