diff options
author | jinxing <jinxing@meituan.com> | 2017-02-01 13:54:37 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2017-02-01 13:54:37 -0800 |
commit | c5fcb7f68bff055cc56e487bd48994945e7935cd (patch) | |
tree | 7c62452af7ab4238dc6149825f55876ba084a041 | |
parent | df4a27cc5cae8e251ba2a883bcc5f5ce9282f649 (diff) | |
download | spark-c5fcb7f68bff055cc56e487bd48994945e7935cd.tar.gz spark-c5fcb7f68bff055cc56e487bd48994945e7935cd.tar.bz2 spark-c5fcb7f68bff055cc56e487bd48994945e7935cd.zip |
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry.
## What changes were proposed in this pull request?
`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.
**To reproduce**:
1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
def streamProcessing(): Unit = {
val conf = new SparkConf()
.setAppName("StreamingTest")
.setMaster(masterUrl)
val ssc = new StreamingContext(conf, Seconds(200))
val stream = ssc.socketTextStream("localhost", 1234)
stream.print()
ssc.start()
ssc.awaitTermination()
}
```
**To fix**:
It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.
## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.
Author: jinxing <jinxing@meituan.com>
Closes #16690 from jinxing64/SPARK-19347.
-rw-r--r-- | core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala | 38 | ||||
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala | 2 |
2 files changed, 36 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala index 994e18676e..a5778876d4 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala @@ -63,8 +63,38 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) /** - * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default - * timeout, or throw a SparkException if this fails even after the default number of retries. + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, throw an exception if this fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + + * @param message the message to send + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout) + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * specified timeout, throw an exception if this fails. + * + * Note: this is a blocking action which may cost a lot of time, so don't call it in a message + * loop of [[RpcEndpoint]]. + * + * @param message the message to send + * @param timeout the timeout duration + * @tparam T type of the reply message + * @return the reply message from the corresponding [[RpcEndpoint]] + */ + def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = { + val future = ask[T](message, timeout) + timeout.awaitResult(future) + } + + /** + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a + * default timeout, throw a SparkException if this fails even after the default number of retries. * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * @@ -75,10 +105,11 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.2.0") def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout) /** - * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a + * Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a * specified timeout, throw a SparkException if this fails even after the specified number of * retries. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. @@ -91,6 +122,7 @@ private[spark] abstract class RpcEndpointRef(conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ + @deprecated("use 'askSync' instead.", "2.2.0") def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { // TODO: Consider removing multiple attempts var attempts = 0 diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index eca7c79465..722024b8a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl( logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) - trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) + trackerEndpoint.askSync[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } |