aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-20 23:18:42 -0700
committerReynold Xin <rxin@databricks.com>2015-04-20 23:18:42 -0700
commit8136810dfad12008ac300116df7bc8448740f1ae (patch)
tree21c7c611004ee592444dbce06005b484f3d39721 /core/src/test
parentc736220dac51cf73181fdd7f621c960c4e7bf0c2 (diff)
downloadspark-8136810dfad12008ac300116df7bc8448740f1ae.tar.gz
spark-8136810dfad12008ac300116df7bc8448740f1ae.tar.bz2
spark-8136810dfad12008ac300116df7bc8448740f1ae.zip
[SPARK-6490][Core] Add spark.rpc.* and deprecate spark.akka.*
Deprecated `spark.akka.num.retries`, `spark.akka.retry.wait`, `spark.akka.askTimeout`, `spark.akka.lookupTimeout`, and added `spark.rpc.num.retries`, `spark.rpc.retry.wait`, `spark.rpc.askTimeout`, `spark.rpc.lookupTimeout`. Author: zsxwing <zsxwing@gmail.com> Closes #5595 from zsxwing/SPARK-6490 and squashes the following commits: e0d80a9 [zsxwing] Use getTimeAsMs and getTimeAsSeconds and other minor fixes 31dbe69 [zsxwing] Add spark.rpc.* and deprecate spark.akka.*
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala24
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
3 files changed, 27 insertions, 5 deletions
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6295d34be5..6ed057a7ca 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,7 +154,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch below akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("spark")
@@ -180,7 +180,7 @@ class MapOutputTrackerSuite extends FunSuite {
test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
- newConf.set("spark.akka.askTimeout", "1") // Fail fast
+ newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
val rpcEnv = createRpcEnv("test")
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 8e6c200c4b..d7d8014a20 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -19,11 +19,13 @@ package org.apache.spark
import java.util.concurrent.{TimeUnit, Executors}
+import scala.concurrent.duration._
+import scala.language.postfixOps
import scala.util.{Try, Random}
import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{RpcUtils, ResetSystemProperties}
import com.esotericsoftware.kryo.Kryo
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
@@ -222,6 +224,26 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro
assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420)
}
+ test("akka deprecated configs") {
+ val conf = new SparkConf()
+
+ assert(!conf.contains("spark.rpc.num.retries"))
+ assert(!conf.contains("spark.rpc.retry.wait"))
+ assert(!conf.contains("spark.rpc.askTimeout"))
+ assert(!conf.contains("spark.rpc.lookupTimeout"))
+
+ conf.set("spark.akka.num.retries", "1")
+ assert(RpcUtils.numRetries(conf) === 1)
+
+ conf.set("spark.akka.retry.wait", "2")
+ assert(RpcUtils.retryWaitMs(conf) === 2L)
+
+ conf.set("spark.akka.askTimeout", "3")
+ assert(RpcUtils.askTimeout(conf) === (3 seconds))
+
+ conf.set("spark.akka.lookupTimeout", "4")
+ assert(RpcUtils.lookupTimeout(conf) === (4 seconds))
+ }
}
class Class1 {}
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index ada07ef11c..5fbda37c7c 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,8 +155,8 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
})
val conf = new SparkConf()
- conf.set("spark.akka.retry.wait", "0")
- conf.set("spark.akka.num.retries", "1")
+ conf.set("spark.rpc.retry.wait", "0")
+ conf.set("spark.rpc.num.retries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")