aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
3 files changed, 27 insertions, 5 deletions
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6295d34be5..6ed057a7ca 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("spark")
@@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("test")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 8e6c200c4b..d7d8014a20 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark
import java.util.concurrent.{TimeUnit, Executors}
+import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.util.{Try, Random}
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
@@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}
+ test("akka deprecated configs") {
+ val conf = new SparkConf()
+
+ assert(!conf.contains("spark.rpc.num.retries"))
+ assert(!conf.contains("spark.rpc.retry.wait"))
+ assert(!conf.contains("spark.rpc.askTimeout"))
+ assert(!conf.contains("spark.rpc.lookupTimeout"))
+
+ conf.set("spark.akka.num.retries", "1")
+ assert(RpcUtils.numRetries(conf) === 1)
+
+ conf.set("spark.akka.retry.wait", "2")
+ assert(RpcUtils.retryWaitMs(conf) === 2L)
+
+ conf.set("spark.akka.askTimeout", "3")
+ assert(RpcUtils.askTimeout(conf) === (3 seconds))
+
+ conf.set("spark.akka.lookupTimeout", "4")
+ assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+ }
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index ada07ef11c..5fbda37c7c 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})
val conf = new SparkConf()
- conf.set("spark.akka.retry.wait", "0")
- conf.set("spark.akka.num.retries", "1")
+ conf.set("spark.rpc.retry.wait", "0")
+ conf.set("spark.rpc.num.retries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")