aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-07 21:56:35 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-07 21:56:35 -0800
commitf5f12dc28218f3ed89836434ab0530e88b043e47 (patch)
tree3b49f71c84d294c6c342d899fe01d48ce5ce39c5 /core/src
parent11891e68c32d1078ed16c65cca23e28a1f171bb7 (diff)
parenteb24684748da5dc2495fc4afe6da58edb463294b (diff)
downloadspark-f5f12dc28218f3ed89836434ab0530e88b043e47.tar.gz
spark-f5f12dc28218f3ed89836434ab0530e88b043e47.tar.bz2
spark-f5f12dc28218f3ed89836434ab0530e88b043e47.zip
Merge pull request #336 from liancheng/akka-remote-lookup
Get rid of `Either[ActorRef, ActorSelection]' In this pull request, instead of returning an `Either[ActorRef, ActorSelection]`, `registerOrLookup` identifies the remote actor blockingly to obtain an `ActorRef`, or throws an exception if the remote actor doesn't exist or the lookup times out (configured by `spark.akka.lookupTimeout`). This function is only called when an `SparkEnv` is constructed (instantiating driver or executor), so the blocking call is considered acceptable. Executor side `ActorSelection`s/`ActorRef`s to driver side `MapOutputTrackerMasterActor` and `BlockManagerMasterActor` are affected by this pull request. `ActorSelection` is dangerous and should be used with care. It's only absolutely safe to send messages via an `ActorSelection` when the remote actor is stateless, so that actor incarnation is irrelevant. But as pointed by @ScrapCodes in the comments below, executor exits immediately once the connection to the driver lost, `ActorSelection`s are not harmful in this scenario. So this pull request is mostly a code style patch.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala4
8 files changed, 30 insertions, 38 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index cdae167aef..77b8ca1cce 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: Either[ActorRef, ActorSelection] = _
+ var trackerActor: ActorRef = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
@@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
- /*
- The difference between ActorRef and ActorSelection is well explained here:
- http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
- In spark a map output tracker can be either started on Driver where it is created which
- is an ActorRef or it can be on executor from where it is looked up which is an
- actorSelection.
- */
- val future = trackerActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f0a7..2e36ccb9a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,11 +17,10 @@
package org.apache.spark
-import collection.mutable
-import serializer.Serializer
+import scala.collection.mutable
+import scala.concurrent.Await
import akka.actor._
-import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
@@ -157,17 +156,18 @@ object SparkEnv extends Logging {
conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
- def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
+ def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
- Left(actorSystem.actorOf(Props(newActor), name = name))
+ actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
- logInfo("Connecting to " + name + ": " + url)
- Right(actorSystem.actorSelection(url))
+ val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ logInfo(s"Connecting to $name: $url")
+ Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index b5afe8cd23..51a29ed8ef 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
- conf: SparkConf) extends Logging {
+class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
@@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = driverActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index dca98c6c05..729ba2c550 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -95,7 +95,7 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 2ee37815de..3f009a8998 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -105,4 +105,9 @@ private[spark] object AkkaUtils {
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
+
+ /** Returns the default Spark timeout to use for Akka remote actor lookup. */
+ def lookupTimeout(conf: SparkConf): FiniteDuration = {
+ Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 10b8b441fd..afc1beff98 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -23,6 +23,7 @@ import akka.actor._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils
+import scala.concurrent.Await
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
private val conf = new SparkConf
@@ -49,14 +50,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master start and stop") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+ tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.stop()
}
test("master register and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+ tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -75,7 +76,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("master register and unregister and fetch") {
val actorSystem = ActorSystem("test")
val tracker = new MapOutputTrackerMaster(conf)
- tracker.trackerActor = Left(actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker))))
+ tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker)))
tracker.registerShuffle(10, 2)
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
@@ -101,13 +102,15 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerActor = Left(actorSystem.actorOf(
- Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker"))
+ masterTracker.trackerActor = actorSystem.actorOf(
+ Props(new MapOutputTrackerMasterActor(masterTracker)), "MapOutputTracker")
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0, conf = conf)
val slaveTracker = new MapOutputTracker(conf)
- slaveTracker.trackerActor = Right(slaveSystem.actorSelection(
- "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker"))
+ val selection = slaveSystem.actorSelection(
+ s"akka.tcp://spark@localhost:$boundPort/user/MapOutputTracker")
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ slaveTracker.trackerActor = Await.result(selection.resolveOne(timeout), timeout)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index fded582640..f60ce270c7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -56,7 +56,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
conf.set("spark.hostPort", "localhost:" + boundPort)
master = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index af4b31d53c..829f389460 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -23,9 +23,7 @@ import scala.collection.mutable
import com.google.common.io.Files
import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import scala.util.Try
-import akka.actor.{Props, ActorSelection, ActorSystem}
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
private val testConf = new SparkConf(false)