aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-01 21:39:21 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-01 21:39:21 -0800
commit8303e20c45153f91e585e230caa29b728a4d8c6c (patch)
treef1cb2d2cc0d0f629308ad95b4f6860ec3d0c587e /core/src/main/scala
parentb0985764f00acea97df7399a6b337262fc97f5ee (diff)
downloadspark-8303e20c45153f91e585e230caa29b728a4d8c6c.tar.gz
spark-8303e20c45153f91e585e230caa29b728a4d8c6c.tar.bz2
spark-8303e20c45153f91e585e230caa29b728a4d8c6c.zip
[SPARK-19432][CORE] Fix an unexpected failure when connecting timeout
## What changes were proposed in this pull request? When connecting timeout, `ask` may fail with a confusing message: ``` 17/02/01 23:15:19 INFO Worker: Connecting to master ... java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set. at scala.Predef$.require(Predef.scala:224) at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232) at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) ``` It's better to provide a meaningful message. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16773 from zsxwing/connect-timeout.
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala9
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
index 6c090ada5a..a7b7f58376 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala
@@ -56,7 +56,7 @@ private[netty] case class RpcOutboxMessage(
content: ByteBuffer,
_onFailure: (Throwable) => Unit,
_onSuccess: (TransportClient, ByteBuffer) => Unit)
- extends OutboxMessage with RpcResponseCallback {
+ extends OutboxMessage with RpcResponseCallback with Logging {
private var client: TransportClient = _
private var requestId: Long = _
@@ -67,8 +67,11 @@ private[netty] case class RpcOutboxMessage(
}
def onTimeout(): Unit = {
- require(client != null, "TransportClient has not yet been set.")
- client.removeRpcRequest(requestId)
+ if (client != null) {
+ client.removeRpcRequest(requestId)
+ } else {
+ logError("Ask timeout before connecting successfully")
+ }
}
override def onFailure(e: Throwable): Unit = {