aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-06 18:46:04 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-06 18:46:04 -0700
commit9a3b3f32a3ccb849293180a899377e8468f7544a (patch)
treece09892ad1b77fe1e5e3ec2bdb385b7b49cbe204 /core/src/main
parent0e42832e6ae8d1e343d9b153af0a787fe8507602 (diff)
downloadspark-9a3b3f32a3ccb849293180a899377e8468f7544a.tar.gz
spark-9a3b3f32a3ccb849293180a899377e8468f7544a.tar.bz2
spark-9a3b3f32a3ccb849293180a899377e8468f7544a.zip
Pass sizes of map outputs back to MapOutputTracker
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala6
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala139
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/MapStatus.scala27
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala11
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala12
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala7
8 files changed, 129 insertions, 83 deletions
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index fb65ba421a..048d1788c2 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -17,12 +17,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
- val addresses = SparkEnv.get.mapOutputTracker.getServerAddresses(shuffleId)
+ val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[Int]]
- for ((address, index) <- addresses.zipWithIndex) {
+ for (((address, size), index) <- statuses.zipWithIndex) {
splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += index
}
@@ -44,7 +44,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
blockId match {
case regex(shufId, mapId, reduceId) =>
- val addr = addresses(mapId.toInt)
+ val addr = statuses(mapId.toInt)._1
throw new FetchFailedException(addr, shufId.toInt, mapId.toInt, reduceId.toInt, null)
case _ =>
throw new SparkException(
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 116d526854..1b4b5ed240 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -1,6 +1,6 @@
package spark
-import java.io.{DataInputStream, DataOutputStream, ByteArrayOutputStream, ByteArrayInputStream}
+import java.io._
import java.util.concurrent.ConcurrentHashMap
import akka.actor._
@@ -14,7 +14,9 @@ import akka.util.duration._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+import scheduler.MapStatus
import spark.storage.BlockManagerId
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
private[spark] sealed trait MapOutputTrackerMessage
private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
@@ -40,16 +42,16 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
val timeout = 10.seconds
- var bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+ var mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
private var generation: Long = 0
- private var generationLock = new java.lang.Object
+ private val generationLock = new java.lang.Object
- // Cache a serialized version of the output locations for each shuffle to send them out faster
+ // Cache a serialized version of the output statuses for each shuffle to send them out faster
var cacheGeneration = generation
- val cachedSerializedLocs = new HashMap[Int, Array[Byte]]
+ val cachedSerializedStatuses = new HashMap[Int, Array[Byte]]
var trackerActor: ActorRef = if (isMaster) {
val actor = actorSystem.actorOf(Props(new MapOutputTrackerActor(this)), name = actorName)
@@ -80,31 +82,34 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
def registerShuffle(shuffleId: Int, numMaps: Int) {
- if (bmAddresses.get(shuffleId) != null) {
+ if (mapStatuses.get(shuffleId) != null) {
throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
}
- bmAddresses.put(shuffleId, new Array[BlockManagerId](numMaps))
+ mapStatuses.put(shuffleId, new Array[MapStatus](numMaps))
}
- def registerMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
- var array = bmAddresses.get(shuffleId)
+ def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
+ var array = mapStatuses.get(shuffleId)
array.synchronized {
- array(mapId) = bmAddress
+ array(mapId) = status
}
}
- def registerMapOutputs(shuffleId: Int, locs: Array[BlockManagerId], changeGeneration: Boolean = false) {
- bmAddresses.put(shuffleId, Array[BlockManagerId]() ++ locs)
+ def registerMapOutputs(
+ shuffleId: Int,
+ statuses: Array[MapStatus],
+ changeGeneration: Boolean = false) {
+ mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
if (changeGeneration) {
incrementGeneration()
}
}
def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
- var array = bmAddresses.get(shuffleId)
+ var array = mapStatuses.get(shuffleId)
if (array != null) {
array.synchronized {
- if (array(mapId) == bmAddress) {
+ if (array(mapId).address == bmAddress) {
array(mapId) = null
}
}
@@ -117,10 +122,10 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// Remembers which map output locations are currently being fetched on a worker
val fetching = new HashSet[Int]
- // Called on possibly remote nodes to get the server URIs for a given shuffle
- def getServerAddresses(shuffleId: Int): Array[BlockManagerId] = {
- val locs = bmAddresses.get(shuffleId)
- if (locs == null) {
+ // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
+ def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
+ val statuses = mapStatuses.get(shuffleId)
+ if (statuses == null) {
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
fetching.synchronized {
if (fetching.contains(shuffleId)) {
@@ -129,10 +134,11 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
try {
fetching.wait()
} catch {
- case _ =>
+ case e: InterruptedException =>
}
}
- return bmAddresses.get(shuffleId)
+ return mapStatuses.get(shuffleId).map(status =>
+ (status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
} else {
fetching += shuffleId
}
@@ -140,23 +146,25 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
- val fetchedLocs = deserializeLocations(fetchedBytes)
+ val fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
- bmAddresses.put(shuffleId, fetchedLocs)
+ mapStatuses.put(shuffleId, fetchedStatuses)
fetching.synchronized {
fetching -= shuffleId
fetching.notifyAll()
}
- return fetchedLocs
+ return fetchedStatuses.map(s =>
+ (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else {
- return locs
+ return statuses.map(s =>
+ (s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
}
}
def stop() {
communicate(StopMapOutputTracker)
- bmAddresses.clear()
+ mapStatuses.clear()
trackerActor = null
}
@@ -182,75 +190,82 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
generationLock.synchronized {
if (newGen > generation) {
logInfo("Updating generation to " + newGen + " and clearing cache")
- bmAddresses = new ConcurrentHashMap[Int, Array[BlockManagerId]]
+ mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]
generation = newGen
}
}
}
def getSerializedLocations(shuffleId: Int): Array[Byte] = {
- var locs: Array[BlockManagerId] = null
+ var statuses: Array[MapStatus] = null
var generationGotten: Long = -1
generationLock.synchronized {
if (generation > cacheGeneration) {
- cachedSerializedLocs.clear()
+ cachedSerializedStatuses.clear()
cacheGeneration = generation
}
- cachedSerializedLocs.get(shuffleId) match {
+ cachedSerializedStatuses.get(shuffleId) match {
case Some(bytes) =>
return bytes
case None =>
- locs = bmAddresses.get(shuffleId)
+ statuses = mapStatuses.get(shuffleId)
generationGotten = generation
}
}
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "locs"; let's serialize and return that
- val bytes = serializeLocations(locs)
+ val bytes = serializeStatuses(statuses)
// Add them into the table only if the generation hasn't changed while we were working
generationLock.synchronized {
if (generation == generationGotten) {
- cachedSerializedLocs(shuffleId) = bytes
+ cachedSerializedStatuses(shuffleId) = bytes
}
}
return bytes
}
// Serialize an array of map output locations into an efficient byte format so that we can send
- // it to reduce tasks. We do this by grouping together the locations by block manager ID.
- def serializeLocations(locs: Array[BlockManagerId]): Array[Byte] = {
+ // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
+ // generally be pretty compressible because many map outputs will be on the same hostname.
+ def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = {
val out = new ByteArrayOutputStream
- val dataOut = new DataOutputStream(out)
- dataOut.writeInt(locs.length)
- val grouped = locs.zipWithIndex.groupBy(_._1)
- dataOut.writeInt(grouped.size)
- for ((id, pairs) <- grouped if id != null) {
- dataOut.writeUTF(id.ip)
- dataOut.writeInt(id.port)
- dataOut.writeInt(pairs.length)
- for ((_, blockIndex) <- pairs) {
- dataOut.writeInt(blockIndex)
- }
- }
- dataOut.close()
+ val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
+ objOut.writeObject(statuses)
+ objOut.close()
out.toByteArray
}
- // Opposite of serializeLocations.
- def deserializeLocations(bytes: Array[Byte]): Array[BlockManagerId] = {
- val dataIn = new DataInputStream(new ByteArrayInputStream(bytes))
- val length = dataIn.readInt()
- val array = new Array[BlockManagerId](length)
- val numGroups = dataIn.readInt()
- for (i <- 0 until numGroups) {
- val ip = dataIn.readUTF()
- val port = dataIn.readInt()
- val id = new BlockManagerId(ip, port)
- val numBlocks = dataIn.readInt()
- for (j <- 0 until numBlocks) {
- array(dataIn.readInt()) = id
- }
+ // Opposite of serializeStatuses.
+ def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = {
+ val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
+ objIn.readObject().asInstanceOf[Array[MapStatus]]
+ }
+}
+
+private[spark] object MapOutputTracker {
+ private val LOG_BASE = 1.1
+
+ /**
+ * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
+ * We do this by encoding the log base 1.1 of the size as an integer, which can support
+ * sizes up to 35 GB with at most 10% error.
+ */
+ def compressSize(size: Long): Byte = {
+ if (size <= 1L) {
+ 0
+ } else {
+ math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
+ }
+ }
+
+ /**
+ * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
+ */
+ def decompressSize(compressedSize: Byte): Long = {
+ if (compressedSize == 0) {
+ 1
+ } else {
+ math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
}
- array
}
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9b666ed181..6f4c6bffd7 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -422,11 +422,11 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case smt: ShuffleMapTask =>
val stage = idToStage(smt.stageId)
- val bmAddress = event.result.asInstanceOf[BlockManagerId]
- val host = bmAddress.ip
+ val status = event.result.asInstanceOf[MapStatus]
+ val host = status.address.ip
logInfo("ShuffleMapTask finished with host " + host)
if (!deadHosts.contains(host)) { // TODO: Make sure hostnames are consistent with Mesos
- stage.addOutputLoc(smt.partition, bmAddress)
+ stage.addOutputLoc(smt.partition, status)
}
if (running.contains(stage) && pendingTasks(stage).isEmpty) {
logInfo(stage + " (" + stage.origin + ") finished; looking for newly runnable stages")
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
new file mode 100644
index 0000000000..4532d9497f
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/MapStatus.scala
@@ -0,0 +1,27 @@
+package spark.scheduler
+
+import spark.storage.BlockManagerId
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+
+/**
+ * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
+ * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
+ * The map output sizes are compressed using MapOutputTracker.compressSize.
+ */
+private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte])
+ extends Externalizable {
+
+ def this() = this(null, null) // For deserialization only
+
+ def writeExternal(out: ObjectOutput) {
+ address.writeExternal(out)
+ out.writeInt(compressedSizes.length)
+ out.write(compressedSizes)
+ }
+
+ def readExternal(in: ObjectInput) {
+ address = new BlockManagerId(in)
+ compressedSizes = new Array[Byte](in.readInt())
+ in.readFully(compressedSizes)
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 9ca2c9e449..3e5ba10fd9 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -74,7 +74,7 @@ private[spark] class ShuffleMapTask(
var dep: ShuffleDependency[_,_,_],
var partition: Int,
@transient var locs: Seq[String])
- extends Task[BlockManagerId](stageId)
+ extends Task[MapStatus](stageId)
with Externalizable
with Logging {
@@ -109,7 +109,7 @@ private[spark] class ShuffleMapTask(
split = in.readObject().asInstanceOf[Split]
}
- override def run(attemptId: Long): BlockManagerId = {
+ override def run(attemptId: Long): MapStatus = {
val numOutputSplits = dep.partitioner.numPartitions
val aggregator = dep.aggregator.asInstanceOf[Aggregator[Any, Any, Any]]
val partitioner = dep.partitioner
@@ -141,15 +141,18 @@ private[spark] class ShuffleMapTask(
buckets.map(_.iterator)
}
+ val compressedSizes = new Array[Byte](numOutputSplits)
+
val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) {
val blockId = "shuffle_" + dep.shuffleId + "_" + partition + "_" + i
// Get a Scala iterator from Java map
val iter: Iterator[(Any, Any)] = bucketIterators(i)
- blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
+ val size = blockManager.put(blockId, iter, StorageLevel.DISK_ONLY, false)
+ compressedSizes(i) = MapOutputTracker.compressSize(size)
}
- return SparkEnv.get.blockManager.blockManagerId
+ return new MapStatus(blockManager.blockManagerId, compressedSizes)
}
override def preferredLocations: Seq[String] = locs
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 803dd1b97d..1149c00a23 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -29,29 +29,29 @@ private[spark] class Stage(
val isShuffleMap = shuffleDep != None
val numPartitions = rdd.splits.size
- val outputLocs = Array.fill[List[BlockManagerId]](numPartitions)(Nil)
+ val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
private var nextAttemptId = 0
def isAvailable: Boolean = {
- if (/*parents.size == 0 &&*/ !isShuffleMap) {
+ if (!isShuffleMap) {
true
} else {
numAvailableOutputs == numPartitions
}
}
- def addOutputLoc(partition: Int, bmAddress: BlockManagerId) {
+ def addOutputLoc(partition: Int, status: MapStatus) {
val prevList = outputLocs(partition)
- outputLocs(partition) = bmAddress :: prevList
+ outputLocs(partition) = status :: prevList
if (prevList == Nil)
numAvailableOutputs += 1
}
def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_ == bmAddress)
+ val newList = prevList.filterNot(_.address == bmAddress)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
numAvailableOutputs -= 1
@@ -62,7 +62,7 @@ private[spark] class Stage(
var becameUnavailable = false
for (partition <- 0 until numPartitions) {
val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.ip == host)
+ val newList = prevList.filterNot(_.address.ip == host)
outputLocs(partition) = newList
if (prevList != Nil && newList == Nil) {
becameUnavailable = true
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 31debdd0fb..c4b241bf5a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -20,7 +20,9 @@ import sun.nio.ch.DirectBuffer
private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
- def this() = this(null, 0)
+ def this() = this(null, 0) // For deserialization only
+
+ def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
override def writeExternal(out: ObjectOutput) {
out.writeUTF(ip)
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 26f4ddfb49..3e6f09257a 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -58,19 +58,18 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
objOut.writeAll(values)
objOut.close()
- val finishTime = System.currentTimeMillis
+ val length = file.length()
logDebug("Block %s stored as %s file on disk in %d ms".format(
- blockId, Utils.memoryBytesToString(file.length()), (finishTime - startTime)))
+ blockId, Utils.memoryBytesToString(length), (System.currentTimeMillis - startTime)))
if (returnValues) {
// Return a byte buffer for the contents of the file
val channel = new RandomAccessFile(file, "r").getChannel()
- val length = channel.size()
val buffer = channel.map(MapMode.READ_ONLY, 0, length)
channel.close()
PutResult(length, Right(buffer))
} else {
- null
+ PutResult(length, null)
}
}