aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-12-24 19:49:41 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-12-24 19:50:01 -0800
commit17d6f547bf3612890f79e331b427e286c5fca730 (patch)
tree9194e785badea117f5202a32be21ea440aff3b0a /streaming
parent1a4e2ba7369b9eb1dab6cb66cdb7b21129e7faf1 (diff)
downloadspark-17d6f547bf3612890f79e331b427e286c5fca730.tar.gz
spark-17d6f547bf3612890f79e331b427e286c5fca730.tar.bz2
spark-17d6f547bf3612890f79e331b427e286c5fca730.zip
[SPARK-4873][Streaming] Use `Future.zip` instead of `Future.flatMap`(for-loop) in WriteAheadLogBasedBlockHandler
Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one. Author: zsxwing <zsxwing@gmail.com> Closes #3721 from zsxwing/SPARK-4873 and squashes the following commits: 46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop) (cherry picked from commit b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala5
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index c0670e22a7..8b97db8dd3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -187,10 +187,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
}
// Combine the futures, wait for both to complete, and return the write ahead log segment
- val combinedFuture = for {
- _ <- storeInBlockManagerFuture
- fileSegment <- storeInWriteAheadLogFuture
- } yield fileSegment
+ val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val segment = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, segment)
}