aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorLian, Cheng <rhythm.mail@gmail.com>2014-01-06 09:18:17 +0800
committerLian, Cheng <rhythm.mail@gmail.com>2014-01-06 09:18:17 +0800
commita4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe (patch)
tree1f2324b9dfaa5ce4f1188d3ab59a6686d2f5cbc8 /core/src
parentd43ad3ef2c3d4b690ba1d053729daefb507cd23c (diff)
downloadspark-a4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe.tar.gz
spark-a4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe.tar.bz2
spark-a4048ff31e6f8d3e1451d8ae2d5b9edee42cfbbe.zip
Get rid of `Either[ActorRef, ActorSelection]'
Although we can send messages via an ActorSelection, it would be better to identify the actor and obtain an ActorRef first, so that we can get informed earlier if the remote actor doesn't exist, and get rid of the annoying Either wrapper.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala4
6 files changed, 19 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index cdae167aef..77b8ca1cce 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
- var trackerActor: Either[ActorRef, ActorSelection] = _
+ var trackerActor: ActorRef = _
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
@@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
// throw a SparkException if this fails.
private def askTracker(message: Any): Any = {
try {
- /*
- The difference between ActorRef and ActorSelection is well explained here:
- http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor
- In spark a map output tracker can be either started on Driver where it is created which
- is an ActorRef or it can be on executor from where it is looked up which is an
- actorSelection.
- */
- val future = trackerActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = trackerActor.ask(message)(timeout)
Await.result(future, timeout)
} catch {
case e: Exception =>
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 634a94f0a7..2e36ccb9a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -17,11 +17,10 @@
package org.apache.spark
-import collection.mutable
-import serializer.Serializer
+import scala.collection.mutable
+import scala.concurrent.Await
import akka.actor._
-import akka.remote.RemoteActorRefProvider
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
@@ -157,17 +156,18 @@ object SparkEnv extends Logging {
conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
conf)
- def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
+ def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
logInfo("Registering " + name)
- Left(actorSystem.actorOf(Props(newActor), name = name))
+ actorSystem.actorOf(Props(newActor), name = name)
} else {
val driverHost: String = conf.get("spark.driver.host", "localhost")
val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
- logInfo("Connecting to " + name + ": " + url)
- Right(actorSystem.actorSelection(url))
+ val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"
+ val timeout = AkkaUtils.lookupTimeout(conf)
+ logInfo(s"Connecting to $name: $url")
+ Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index b5afe8cd23..51a29ed8ef 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -28,8 +28,7 @@ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.AkkaUtils
private[spark]
-class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
- conf: SparkConf) extends Logging {
+class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging {
val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt
val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt
@@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection],
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
- val future = driverActor match {
- case Left(a: ActorRef) => a.ask(message)(timeout)
- case Right(b: ActorSelection) => b.ask(message)(timeout)
- }
+ val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index dca98c6c05..729ba2c550 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -95,7 +95,7 @@ private[spark] object ThreadingTest {
val conf = new SparkConf()
val serializer = new KryoSerializer(conf)
val blockManagerMaster = new BlockManagerMaster(
- Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf)
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf)
val blockManager = new BlockManager(
"<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 362cea5e3e..b4c4e1dbbc 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -91,4 +91,9 @@ private[spark] object AkkaUtils {
def askTimeout(conf: SparkConf): FiniteDuration = {
Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds")
}
+
+ /** Returns the default Spark timeout to use for Akka remote actor lookup. */
+ def lookupTimeout(conf: SparkConf): FiniteDuration = {
+ Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index af4b31d53c..829f389460 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -23,9 +23,7 @@ import scala.collection.mutable
import com.google.common.io.Files
import org.apache.spark.SparkConf
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
-import scala.util.Try
-import akka.actor.{Props, ActorSelection, ActorSystem}
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
private val testConf = new SparkConf(false)