diff options
8 files changed, 30 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index cdae167aef..77b8ca1cce 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private val timeout = AkkaUtils.askTimeout(conf) // Set to the MapOutputTrackerActor living on the driver - var trackerActor: Either[ActorRef, ActorSelection] = _ + var trackerActor: ActorRef = _ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] @@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { // throw a SparkException if this fails. private def askTracker(message: Any): Any = { try { - /* - The difference between ActorRef and ActorSelection is well explained here: - http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor - In spark a map output tracker can be either started on Driver where it is created which - is an ActorRef or it can be on executor from where it is looked up which is an - actorSelection. - */ - val future = trackerActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = trackerActor.ask(message)(timeout) Await.result(future, timeout) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 634a94f0a7..2e36ccb9a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,11 +17,10 @@ package org.apache.spark -import collection.mutable -import serializer.Serializer +import scala.collection.mutable +import scala.concurrent.Await import akka.actor._ -import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem @@ -157,17 +156,18 @@ object SparkEnv extends Logging { conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) - def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = { + def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) - Left(actorSystem.actorOf(Props(newActor), name = name)) + actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.get("spark.driver.port", "7077").toInt Utils.checkHost(driverHost, "Expected hostname") - val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name) - logInfo("Connecting to " + name + ": " + url) - Right(actorSystem.actorSelection(url)) + val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" + val timeout = AkkaUtils.lookupTimeout(conf) + logInfo(s"Connecting to $name: $url") + Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b5afe8cd23..51a29ed8ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], - conf: SparkConf) extends Logging { +class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt @@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { - val future = driverActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException("BlockManagerMaster returned null") diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index dca98c6c05..729ba2c550 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -95,7 +95,7 @@ private[spark] object ThreadingTest { val conf = new SparkConf() val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( - Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 2ee37815de..3f009a8998 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -105,4 +105,9 @@ private[spark] object AkkaUtils { def askTimeout(conf: SparkConf): FiniteDuration = { Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds") } + + /** Returns the default Spark timeout to use for Akka remote actor lookup. */ + def lookupTimeout(conf: SparkConf): FiniteDuration = { + Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") + } } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 10b8b441fd..afc1beff98 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -23,6 +23,7 @@ import akka.actor._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils +import scala.concurrent.Await class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { private val conf = new SparkConf @@ -49,14 +50,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -75,7 +76,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -101,13 +102,15 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = Left(actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf) val slaveTracker = new MapOutputTracker(conf) - slaveTracker.trackerActor = Right(slaveSystem.actorSelection( - "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index fded582640..f60ce270c7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT conf.set("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( - Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case oldArch = System.setProperty("os.arch", "amd64") diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index af4b31d53c..829f389460 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -23,9 +23,7 @@ import scala.collection.mutable import com.google.common.io.Files import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -import scala.util.Try -import akka.actor.{Props, ActorSelection, ActorSystem} +import org.scalatest.{BeforeAndAfterEach, FunSuite} class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { private val testConf = new SparkConf(false) |