From a4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 6 Jan 2014 09:18:17 +0800 Subject: Get rid of `Either[ActorRef, ActorSelection]' Although we can send messages via an ActorSelection, it would be better to identify the actor and obtain an ActorRef first, so that we can get informed earlier if the remote actor doesn't exist, and get rid of the annoying Either wrapper. --- .../main/scala/org/apache/spark/MapOutputTracker.scala | 14 ++------------ core/src/main/scala/org/apache/spark/SparkEnv.scala | 16 ++++++++-------- .../org/apache/spark/storage/BlockManagerMaster.scala | 8 ++------ .../scala/org/apache/spark/storage/ThreadingTest.scala | 2 +- .../src/main/scala/org/apache/spark/util/AkkaUtils.scala | 5 +++++ .../org/apache/spark/storage/DiskBlockManagerSuite.scala | 4 +--- 6 files changed, 19 insertions(+), 30 deletions(-) (limited to 'core/src') diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index cdae167aef..77b8ca1cce 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private val timeout = AkkaUtils.askTimeout(conf) // Set to the MapOutputTrackerActor living on the driver - var trackerActor: Either[ActorRef, ActorSelection] = _ + var trackerActor: ActorRef = _ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] @@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { // throw a SparkException if this fails. private def askTracker(message: Any): Any = { try { - /* - The difference between ActorRef and ActorSelection is well explained here: - http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor - In spark a map output tracker can be either started on Driver where it is created which - is an ActorRef or it can be on executor from where it is looked up which is an - actorSelection. - */ - val future = trackerActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = trackerActor.ask(message)(timeout) Await.result(future, timeout) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 634a94f0a7..2e36ccb9a0 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,11 +17,10 @@ package org.apache.spark -import collection.mutable -import serializer.Serializer +import scala.collection.mutable +import scala.concurrent.Await import akka.actor._ -import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem @@ -157,17 +156,18 @@ object SparkEnv extends Logging { conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) - def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = { + def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) - Left(actorSystem.actorOf(Props(newActor), name = name)) + actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") val driverPort: Int = conf.get("spark.driver.port", "7077").toInt Utils.checkHost(driverHost, "Expected hostname") - val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name) - logInfo("Connecting to " + name + ": " + url) - Right(actorSystem.actorSelection(url)) + val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" + val timeout = AkkaUtils.lookupTimeout(conf) + logInfo(s"Connecting to $name: $url") + Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b5afe8cd23..51a29ed8ef 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], - conf: SparkConf) extends Logging { +class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging { val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt @@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { - val future = driverActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException("BlockManagerMaster returned null") diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index dca98c6c05..729ba2c550 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -95,7 +95,7 @@ private[spark] object ThreadingTest { val conf = new SparkConf() val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( - Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) val blockManager = new BlockManager( "", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 362cea5e3e..b4c4e1dbbc 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -91,4 +91,9 @@ private[spark] object AkkaUtils { def askTimeout(conf: SparkConf): FiniteDuration = { Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds") } + + /** Returns the default Spark timeout to use for Akka remote actor lookup. */ + def lookupTimeout(conf: SparkConf): FiniteDuration = { + Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") + } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index af4b31d53c..829f389460 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -23,9 +23,7 @@ import scala.collection.mutable import com.google.common.io.Files import org.apache.spark.SparkConf -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite} -import scala.util.Try -import akka.actor.{Props, ActorSelection, ActorSystem} +import org.scalatest.{BeforeAndAfterEach, FunSuite} class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach { private val testConf = new SparkConf(false) -- cgit v1.2.3 From 5c152e3e219a44f97d9df38ba00cdc4adbf4d873 Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 6 Jan 2014 10:39:05 +0800 Subject: Fixed several compilation errors in test suites --- .../test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 11 +++++++---- .../scala/org/apache/spark/storage/BlockManagerSuite.scala | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) (limited to 'core/src') diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 10b8b441fd..82dc30ecc4 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -23,6 +23,7 @@ import akka.actor._ import org.apache.spark.scheduler.MapStatus import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils +import scala.concurrent.Await class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { private val conf = new SparkConf @@ -101,13 +102,15 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.hostPort", hostname + ":" + boundPort) val masterTracker = new MapOutputTrackerMaster(conf) - masterTracker.trackerActor = Left(actorSystem.actorOf( - Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")) + masterTracker.trackerActor = actorSystem.actorOf( + Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker") val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf) val slaveTracker = new MapOutputTracker(conf) - slaveTracker.trackerActor = Right(slaveSystem.actorSelection( - "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")) + val selection = slaveSystem.actorSelection( + s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker") + val timeout = AkkaUtils.lookupTimeout(conf) + slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout) masterTracker.registerShuffle(10, 1) masterTracker.incrementEpoch() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a0fc3445be..032c2f2f69 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -58,7 +58,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT conf.set("spark.hostPort", "localhost:" + boundPort) master = new BlockManagerMaster( - Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case System.setProperty("os.arch", "amd64") -- cgit v1.2.3 From eb24684748da5dc2495fc4afe6da58edb463294b Mon Sep 17 00:00:00 2001 From: "Lian, Cheng" Date: Mon, 6 Jan 2014 11:21:35 +0800 Subject: Fixed test suite compilation errors --- core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'core/src') diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 82dc30ecc4..afc1beff98 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -50,14 +50,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master start and stop") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.stop() } test("master register and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) @@ -76,7 +76,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("master register and unregister and fetch") { val actorSystem = ActorSystem("test") val tracker = new MapOutputTrackerMaster(conf) - tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))) + tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))) tracker.registerShuffle(10, 2) val compressedSize1000 = MapOutputTracker.compressSize(1000L) val compressedSize10000 = MapOutputTracker.compressSize(10000L) -- cgit v1.2.3