diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/lib/mesos.jar | bin | 33618 -> 36686 bytes | |||
-rw-r--r-- | core/src/main/scala/spark/CacheTracker.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/MapOutputTracker.scala | 7 |
3 files changed, 5 insertions, 4 deletions
diff --git a/core/lib/mesos.jar b/core/lib/mesos.jar Binary files differindex 60d299c8af..eb01ce8a15 100644 --- a/core/lib/mesos.jar +++ b/core/lib/mesos.jar diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 8b5c99cf3c..7040d4e147 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -66,7 +66,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging { if (isMaster) { val tracker = new CacheTrackerActor - tracker.start + tracker.start() trackerActor = tracker } else { val host = System.getProperty("spark.master.host") diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index d36fbc7703..48d11145f2 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -34,18 +34,18 @@ extends DaemonActor with Logging { class MapOutputTracker(isMaster: Boolean) extends Logging { var trackerActor: AbstractActor = null + + private val serverUris = new ConcurrentHashMap[Int, Array[String]] if (isMaster) { val tracker = new MapOutputTrackerActor(serverUris) - tracker.start + tracker.start() trackerActor = tracker } else { val host = System.getProperty("spark.master.host") val port = System.getProperty("spark.master.port").toInt trackerActor = RemoteActor.select(Node(host, port), 'MapOutputTracker) } - - private val serverUris = new ConcurrentHashMap[Int, Array[String]] def registerMapOutput(shuffleId: Int, numMaps: Int, mapId: Int, serverUri: String) { var array = serverUris.get(shuffleId) @@ -82,6 +82,7 @@ class MapOutputTracker(isMaster: Boolean) extends Logging { // We won the race to fetch the output locs; do so logInfo("Doing the fetch; tracker actor = " + trackerActor) val fetched = (trackerActor !? GetMapOutputLocations(shuffleId)).asInstanceOf[Array[String]] + println("Got locations: " + fetched.mkString(", ")) serverUris.put(shuffleId, fetched) fetching.synchronized { fetching -= shuffleId |