diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-02-01 21:39:21 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2017-02-01 21:39:21 -0800 |
commit | 8303e20c45153f91e585e230caa29b728a4d8c6c (patch) | |
tree | f1cb2d2cc0d0f629308ad95b4f6860ec3d0c587e /core/src/main | |
parent | b0985764f00acea97df7399a6b337262fc97f5ee (diff) | |
download | spark-8303e20c45153f91e585e230caa29b728a4d8c6c.tar.gz spark-8303e20c45153f91e585e230caa29b728a4d8c6c.tar.bz2 spark-8303e20c45153f91e585e230caa29b728a4d8c6c.zip |
[SPARK-19432][CORE] Fix an unexpected failure when connecting timeout
## What changes were proposed in this pull request?
When connecting timeout, `ask` may fail with a confusing message:
```
17/02/01 23:15:19 INFO Worker: Connecting to master ...
java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232)
at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
```
It's better to provide a meaningful message.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16773 from zsxwing/connect-timeout.
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala index 6c090ada5a..a7b7f58376 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Outbox.scala @@ -56,7 +56,7 @@ private[netty] case class RpcOutboxMessage( content: ByteBuffer, _onFailure: (Throwable) => Unit, _onSuccess: (TransportClient, ByteBuffer) => Unit) - extends OutboxMessage with RpcResponseCallback { + extends OutboxMessage with RpcResponseCallback with Logging { private var client: TransportClient = _ private var requestId: Long = _ @@ -67,8 +67,11 @@ private[netty] case class RpcOutboxMessage( } def onTimeout(): Unit = { - require(client != null, "TransportClient has not yet been set.") - client.removeRpcRequest(requestId) + if (client != null) { + client.removeRpcRequest(requestId) + } else { + logError("Ask timeout before connecting successfully") + } } override def onFailure(e: Throwable): Unit = { |