aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala97
1 files changed, 94 insertions, 3 deletions
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 1f0aa759b0..6ceafe4337 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -155,16 +155,21 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val conf = new SparkConf()
+ val shortProp = "spark.rpc.short.timeout"
conf.set("spark.rpc.retry.wait", "0")
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 13345)
// Use anotherEnv to find out the RpcEndpointRef
val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
try {
- val e = intercept[Exception] {
- rpcEndpointRef.askWithRetry[String]("hello", 1 millis)
+ // Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause
+ val e = intercept[SparkException] {
+ rpcEndpointRef.askWithRetry[String]("hello", new RpcTimeout(1 millis, shortProp))
}
- assert(e.isInstanceOf[TimeoutException] || e.getCause.isInstanceOf[TimeoutException])
+ // The SparkException cause should be a RpcTimeoutException with message indicating the
+ // controlling timeout property
+ assert(e.getCause.isInstanceOf[RpcTimeoutException])
+ assert(e.getCause.getMessage.contains(shortProp))
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
@@ -539,6 +544,92 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
+ test("construct RpcTimeout with conf property") {
+ val conf = new SparkConf
+
+ val testProp = "spark.ask.test.timeout"
+ val testDurationSeconds = 30
+ val secondaryProp = "spark.ask.secondary.timeout"
+
+ conf.set(testProp, s"${testDurationSeconds}s")
+ conf.set(secondaryProp, "100s")
+
+ // Construct RpcTimeout with a single property
+ val rt1 = RpcTimeout(conf, testProp)
+ assert( testDurationSeconds === rt1.duration.toSeconds )
+
+ // Construct RpcTimeout with prioritized list of properties
+ val rt2 = RpcTimeout(conf, Seq("spark.ask.invalid.timeout", testProp, secondaryProp), "1s")
+ assert( testDurationSeconds === rt2.duration.toSeconds )
+
+ // Construct RpcTimeout with default value,
+ val defaultProp = "spark.ask.default.timeout"
+ val defaultDurationSeconds = 1
+ val rt3 = RpcTimeout(conf, Seq(defaultProp), defaultDurationSeconds.toString + "s")
+ assert( defaultDurationSeconds === rt3.duration.toSeconds )
+ assert( rt3.timeoutProp.contains(defaultProp) )
+
+ // Try to construct RpcTimeout with an unconfigured property
+ intercept[NoSuchElementException] {
+ RpcTimeout(conf, "spark.ask.invalid.timeout")
+ }
+ }
+
+ test("ask a message timeout on Future using RpcTimeout") {
+ case class NeverReply(msg: String)
+
+ val rpcEndpointRef = env.setupEndpoint("ask-future", new RpcEndpoint {
+ override val rpcEnv = env
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case msg: String => context.reply(msg)
+ case _: NeverReply =>
+ }
+ })
+
+ val longTimeout = new RpcTimeout(1 second, "spark.rpc.long.timeout")
+ val shortTimeout = new RpcTimeout(10 millis, "spark.rpc.short.timeout")
+
+ // Ask with immediate response, should complete successfully
+ val fut1 = rpcEndpointRef.ask[String]("hello", longTimeout)
+ val reply1 = longTimeout.awaitResult(fut1)
+ assert("hello" === reply1)
+
+ // Ask with a delayed response and wait for response immediately that should timeout
+ val fut2 = rpcEndpointRef.ask[String](NeverReply("doh"), shortTimeout)
+ val reply2 =
+ intercept[RpcTimeoutException] {
+ shortTimeout.awaitResult(fut2)
+ }.getMessage
+
+ // RpcTimeout.awaitResult should have added the property to the TimeoutException message
+ assert(reply2.contains(shortTimeout.timeoutProp))
+
+ // Ask with delayed response and allow the Future to timeout before Await.result
+ val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
+
+ // Allow future to complete with failure using plain Await.result, this will return
+ // once the future is complete to verify addMessageIfTimeout was invoked
+ val reply3 =
+ intercept[RpcTimeoutException] {
+ Await.result(fut3, 200 millis)
+ }.getMessage
+
+ // When the future timed out, the recover callback should have used
+ // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
+ assert(reply3.contains(shortTimeout.timeoutProp))
+
+ // Use RpcTimeout.awaitResult to process Future, since it has already failed with
+ // RpcTimeoutException, the same RpcTimeoutException should be thrown
+ val reply4 =
+ intercept[RpcTimeoutException] {
+ shortTimeout.awaitResult(fut3)
+ }.getMessage
+
+ // Ensure description is not in message twice after addMessageIfTimeout and awaitResult
+ assert(shortTimeout.timeoutProp.r.findAllIn(reply4).length === 1)
+ }
+
}
class UnserializableClass