aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjinxing <jinxing@meituan.com>2017-02-01 13:54:37 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-02-01 13:54:37 -0800
commitc5fcb7f68bff055cc56e487bd48994945e7935cd (patch)
tree7c62452af7ab4238dc6149825f55876ba084a041 /streaming
parentdf4a27cc5cae8e251ba2a883bcc5f5ce9282f649 (diff)
downloadspark-c5fcb7f68bff055cc56e487bd48994945e7935cd.tar.gz
spark-c5fcb7f68bff055cc56e487bd48994945e7935cd.tar.bz2
spark-c5fcb7f68bff055cc56e487bd48994945e7935cd.zip
[SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry.
## What changes were proposed in this pull request? `ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times. **To reproduce**: 1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent. 2. Rebuild Spark and run following job: ``` def streamProcessing(): Unit = { val conf = new SparkConf() .setAppName("StreamingTest") .setMaster(masterUrl) val ssc = new StreamingContext(conf, Seconds(200)) val stream = ssc.socketTextStream("localhost", 1234) stream.print() ssc.start() ssc.awaitTermination() } ``` **To fix**: It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it. ## How was this patch tested? Test manually. The scenario described above doesn't happen with this patch. Author: jinxing <jinxing@meituan.com> Closes #16690 from jinxing64/SPARK-19347.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index eca7c79465..722024b8a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -159,7 +159,7 @@ private[streaming] class ReceiverSupervisorImpl(
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
- trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
+ trackerEndpoint.askSync[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}