diff options
author | root <root@ip-10-226-118-223.ec2.internal> | 2012-10-07 05:02:18 +0000 |
---|---|---|
committer | root <root@ip-10-226-118-223.ec2.internal> | 2012-10-07 05:02:18 +0000 |
commit | 554b42cb24edb39aa8a9888b7f267ea742758176 (patch) | |
tree | b62255d39aaf7a3ad41c41fe8b24c60e15f3d574 /core/src/main | |
parent | a73b25826be808a1be1de8b61c4c7d2df2bcd5aa (diff) | |
download | spark-554b42cb24edb39aa8a9888b7f267ea742758176.tar.gz spark-554b42cb24edb39aa8a9888b7f267ea742758176.tar.bz2 spark-554b42cb24edb39aa8a9888b7f267ea742758176.zip |
Log more info in MapOutputTracker
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/MapOutputTracker.scala | 11 |
1 files changed, 7 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index 1b4b5ed240..42703126fb 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -19,13 +19,14 @@ import spark.storage.BlockManagerId import java.util.zip.{GZIPInputStream, GZIPOutputStream} private[spark] sealed trait MapOutputTrackerMessage -private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage +private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String) + extends MapOutputTrackerMessage private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { def receive = { - case GetMapOutputLocations(shuffleId: Int) => - logInfo("Asked to get map output locations for shuffle " + shuffleId) + case GetMapOutputStatuses(shuffleId: Int, requester: String) => + logInfo("Asked to get map output locations for shuffle " + shuffleId + " for " + requester) sender ! tracker.getSerializedLocations(shuffleId) case StopMapOutputTracker => @@ -145,7 +146,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea } // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) - val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]] + val host = System.getProperty("spark.hostname", Utils.localHostName) + val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]] val fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") @@ -215,6 +217,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea // If we got here, we failed to find the serialized locations in the cache, so we pulled // out a snapshot of the locations as "locs"; let's serialize and return that val bytes = serializeStatuses(statuses) + logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length)) // Add them into the table only if the generation hasn't changed while we were working generationLock.synchronized { if (generation == generationGotten) { |