aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 05:02:18 +0000
committerroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 05:02:18 +0000
commit554b42cb24edb39aa8a9888b7f267ea742758176 (patch)
treeb62255d39aaf7a3ad41c41fe8b24c60e15f3d574 /core/src/main
parenta73b25826be808a1be1de8b61c4c7d2df2bcd5aa (diff)
downloadspark-554b42cb24edb39aa8a9888b7f267ea742758176.tar.gz
spark-554b42cb24edb39aa8a9888b7f267ea742758176.tar.bz2
spark-554b42cb24edb39aa8a9888b7f267ea742758176.zip
Log more info in MapOutputTracker
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala11
1 files changed, 7 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 1b4b5ed240..42703126fb 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -19,13 +19,14 @@ import spark.storage.BlockManagerId
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
private[spark] sealed trait MapOutputTrackerMessage
-private[spark] case class GetMapOutputLocations(shuffleId: Int) extends MapOutputTrackerMessage
+private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
+ extends MapOutputTrackerMessage
private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
def receive = {
- case GetMapOutputLocations(shuffleId: Int) =>
- logInfo("Asked to get map output locations for shuffle " + shuffleId)
+ case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
+ logInfo("Asked to get map output locations for shuffle " + shuffleId + " for " + requester)
sender ! tracker.getSerializedLocations(shuffleId)
case StopMapOutputTracker =>
@@ -145,7 +146,8 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
}
// We won the race to fetch the output locs; do so
logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val fetchedBytes = askTracker(GetMapOutputLocations(shuffleId)).asInstanceOf[Array[Byte]]
+ val host = System.getProperty("spark.hostname", Utils.localHostName)
+ val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, host)).asInstanceOf[Array[Byte]]
val fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations")
@@ -215,6 +217,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "locs"; let's serialize and return that
val bytes = serializeStatuses(statuses)
+ logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
// Add them into the table only if the generation hasn't changed while we were working
generationLock.synchronized {
if (generation == generationGotten) {