aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-20 23:18:42 -0700
committerReynold Xin <rxin@databricks.com>2015-04-20 23:18:42 -0700
commit8136810dfad12008ac300116df7bc8448740f1ae (patch)
tree21c7c611004ee592444dbce06005b484f3d39721 /core
parentc736220dac51cf73181fdd7f621c960c4e7bf0c2 (diff)
downloadspark-8136810dfad12008ac300116df7bc8448740f1ae.tar.gz
spark-8136810dfad12008ac300116df7bc8448740f1ae.tar.bz2
spark-8136810dfad12008ac300116df7bc8448740f1ae.zip
[SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*
Deprecated `spark.akka.num.retries`, `spark.akka.retry.wait`, `spark.akka.askTimeout`, `spark.akka.lookupTimeout`, and added `spark.rpc.num.retries`, `spark.rpc.retry.wait`, `spark.rpc.askTimeout`, `spark.rpc.lookupTimeout`. Author: zsxwing <zsxwing@gmail.com> Closes #5595 from zsxwing/SPARK-6490 and squashes the following commits: e0d80a9 [zsxwing] Use getTimeAsMs and getTimeAsSeconds and other minor fixes 31dbe69 [zsxwing] Add spark.rpc.* and deprecate spark.akka.*
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
15 files changed, 86 insertions, 53 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e3a649d755..c1996e0875 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -431,7 +431,15 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.am.waitTime" -> Seq(
AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
// Translate old value to a duration, with 10s wait time per try.
- translation = s => s"${s.toLong * 10}s"))
+ translation = s => s"${s.toLong * 10}s")),
+ "spark.rpc.numRetries" -> Seq(
+ AlternateConfig("spark.akka.num.retries", "1.4")),
+ "spark.rpc.retry.wait" -> Seq(
+ AlternateConfig("spark.akka.retry.wait", "1.4")),
+ "spark.rpc.askTimeout" -> Seq(
+ AlternateConfig("spark.akka.askTimeout", "1.4")),
+ "spark.rpc.lookupTimeout" -> Seq(
+ AlternateConfig("spark.akka.lookupTimeout", "1.4"))
)
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 8d13b2a2cd..c2c3e9a9e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,7 +27,7 @@ import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, Utils}
/**
* Proxy that relays messages to the driver.
@@ -36,7 +36,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {
var masterActor: ActorSelection = _
- val timeout = AkkaUtils.askTimeout(conf)
+ val timeout = RpcUtils.askTimeout(conf)
override def preStart(): Unit = {
masterActor = context.actorSelection(
@@ -155,7 +155,7 @@ object Client {
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
- conf.set("spark.akka.askTimeout", "10")
+ conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 4f06d7f96c..43c8a934c3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.{ActorLogReceive, Utils, AkkaUtils}
+import org.apache.spark.util.{ActorLogReceive, RpcUtils, Utils, AkkaUtils}
/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
@@ -193,7 +193,7 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
- val timeout = AkkaUtils.askTimeout(conf)
+ val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c5a6b1beac..ff2eed6dee 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -47,7 +47,7 @@ import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils}
+import org.apache.spark.util.{ActorLogReceive, AkkaUtils, RpcUtils, SignalLogger, Utils}
private[master] class Master(
host: String,
@@ -931,7 +931,7 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
- val timeout = AkkaUtils.askTimeout(conf)
+ val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index bb11e0642d..aad9c87bdb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
/**
* Web UI server for the standalone master.
@@ -31,7 +31,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging {
val masterActorRef = master.self
- val timeout = AkkaUtils.askTimeout(master.conf)
+ val timeout = RpcUtils.askTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
initialize()
diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index 4f19af59f4..2d6b8d4204 100644
--- a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -32,7 +32,7 @@ import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION => sparkVersion}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
@@ -223,7 +223,7 @@ private[rest] class KillRequestServlet(masterActor: ActorRef, conf: SparkConf)
}
protected def handleKill(submissionId: String): KillSubmissionResponse = {
- val askTimeout = AkkaUtils.askTimeout(conf)
+ val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
@@ -257,7 +257,7 @@ private[rest] class StatusRequestServlet(masterActor: ActorRef, conf: SparkConf)
}
protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
- val askTimeout = AkkaUtils.askTimeout(conf)
+ val askTimeout = RpcUtils.askTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
@@ -321,7 +321,7 @@ private[rest] class SubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
- val askTimeout = AkkaUtils.askTimeout(conf)
+ val askTimeout = RpcUtils.askTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index de6423beb5..b3bb5f911d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -25,7 +25,7 @@ import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.worker.ui.WorkerWebUI._
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
/**
* Web UI server for the standalone worker.
@@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {
- private[ui] val timeout = AkkaUtils.askTimeout(worker.conf)
+ private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
initialize()
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index cba038ca35..a5336b7563 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -25,7 +25,7 @@ import scala.language.postfixOps
import scala.reflect.ClassTag
import org.apache.spark.{Logging, SparkException, SecurityManager, SparkConf}
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
@@ -38,7 +38,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {
- private[spark] val defaultLookupTimeout = AkkaUtils.lookupTimeout(conf)
+ private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
@@ -282,9 +282,9 @@ trait ThreadSafeRpcEndpoint extends RpcEndpoint
private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
extends Serializable with Logging {
- private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3)
- private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000)
- private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds
+ private[this] val maxRetries = RpcUtils.numRetries(conf)
+ private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
+ private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
/**
* return the address for the [[RpcEndpointRef]]
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index f72566c370..1406a36a66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
import scala.util.control.NonFatal
@@ -46,7 +46,7 @@ private[spark] abstract class YarnSchedulerBackend(
private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
- private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+ private implicit val askTimeout = RpcUtils.askTimeout(sc.conf)
/**
* Request executors from the ApplicationMaster by specifying the total number desired.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index ceacf04302..c798843bd5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -23,7 +23,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.RpcUtils
private[spark]
class BlockManagerMaster(
@@ -32,7 +32,7 @@ class BlockManagerMaster(
isDriver: Boolean)
extends Logging {
- val timeout = AkkaUtils.askTimeout(conf)
+ val timeout = RpcUtils.askTimeout(conf)
/** Remove a dead executor from the driver endpoint. This is only called on the driver side. */
def removeExecutor(execId: String) {
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 8e8cc7cc63..b725df3b44 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
import scala.collection.JavaConversions.mapAsJavaMap
import scala.concurrent.Await
-import scala.concurrent.duration.{Duration, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask
@@ -125,16 +125,6 @@ private[spark] object AkkaUtils extends Logging {
(actorSystem, boundPort)
}
- /** Returns the default Spark timeout to use for Akka ask operations. */
- def askTimeout(conf: SparkConf): FiniteDuration = {
- Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds")
- }
-
- /** Returns the default Spark timeout to use for Akka remote actor lookup. */
- def lookupTimeout(conf: SparkConf): FiniteDuration = {
- Duration.create(conf.getLong("spark.akka.lookupTimeout", 30), "seconds")
- }
-
private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
/** Returns the configured max frame size for Akka messages in bytes. */
@@ -150,16 +140,6 @@ private[spark] object AkkaUtils extends Logging {
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024
- /** Returns the configured number of times to retry connecting */
- def numRetries(conf: SparkConf): Int = {
- conf.getInt("spark.akka.num.retries", 3)
- }
-
- /** Returns the configured number of milliseconds to wait on each retry */
- def retryWaitMs(conf: SparkConf): Int = {
- conf.getInt("spark.akka.retry.wait", 3000)
- }
-
/**
* Send a message to the given actor and get its result within a default timeout, or
* throw a SparkException if this fails.
@@ -216,7 +196,7 @@ private[spark] object AkkaUtils extends Logging {
val driverPort: Int = conf.getInt("spark.driver.port", 7077)
Utils.checkHost(driverHost, "Expected hostname")
val url = address(protocol(actorSystem), driverActorSystemName, driverHost, driverPort, name)
- val timeout = AkkaUtils.lookupTimeout(conf)
+ val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
@@ -230,7 +210,7 @@ private[spark] object AkkaUtils extends Logging {
val executorActorSystemName = SparkEnv.executorActorSystemName
Utils.checkHost(host, "Expected hostname")
val url = address(protocol(actorSystem), executorActorSystemName, host, port, name)
- val timeout = AkkaUtils.lookupTimeout(conf)
+ val timeout = RpcUtils.lookupTimeout(conf)
logInfo(s"Connecting to $name: $url")
Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index 6665b17c3d..5ae793e0e8 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -17,6 +17,9 @@
package org.apache.spark.util
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -32,4 +35,24 @@ object RpcUtils {
Utils.checkHost(driverHost, "Expected hostname")
rpcEnv.setupEndpointRef(driverActorSystemName, RpcAddress(driverHost, driverPort), name)
}
+
+ /** Returns the configured number of times to retry connecting */
+ def numRetries(conf: SparkConf): Int = {
+ conf.getInt("spark.rpc.numRetries", 3)
+ }
+
+ /** Returns the configured number of milliseconds to wait on each retry */
+ def retryWaitMs(conf: SparkConf): Long = {
+ conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
+ }
+
+ /** Returns the default Spark timeout to use for RPC ask operations. */
+ def askTimeout(conf: SparkConf): FiniteDuration = {
+ conf.getTimeAsSeconds("spark.rpc.askTimeout", "30s") seconds
+ }
+
+ /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
+ def lookupTimeout(conf: SparkConf): FiniteDuration = {
+ conf.getTimeAsSeconds("spark.rpc.lookupTimeout", "30s") seconds
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6295d34be5..6ed057a7ca 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("spark")
@@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("test")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 8e6c200c4b..d7d8014a20 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark
import java.util.concurrent.{TimeUnit, Executors}
+import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.util.{Try, Random}
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
@@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}
+ test("akka deprecated configs") {
+ val conf = new SparkConf()
+
+ assert(!conf.contains("spark.rpc.num.retries"))
+ assert(!conf.contains("spark.rpc.retry.wait"))
+ assert(!conf.contains("spark.rpc.askTimeout"))
+ assert(!conf.contains("spark.rpc.lookupTimeout"))
+
+ conf.set("spark.akka.num.retries", "1")
+ assert(RpcUtils.numRetries(conf) === 1)
+
+ conf.set("spark.akka.retry.wait", "2")
+ assert(RpcUtils.retryWaitMs(conf) === 2L)
+
+ conf.set("spark.akka.askTimeout", "3")
+ assert(RpcUtils.askTimeout(conf) === (3 seconds))
+
+ conf.set("spark.akka.lookupTimeout", "4")
+ assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+ }
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index ada07ef11c..5fbda37c7c 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})
val conf = new SparkConf()
- conf.set("spark.akka.retry.wait", "0")
- conf.set("spark.akka.num.retries", "1")
+ conf.set("spark.rpc.retry.wait", "0")
+ conf.set("spark.rpc.num.retries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")