diff options
author | zsxwing <zsxwing@gmail.com> | 2014-12-24 19:49:41 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-12-24 19:49:41 -0800 |
commit | b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557 (patch) | |
tree | 2ccbdd3d59494f730a42c1ba5f277726d070a3cb /streaming/src | |
parent | 29fabb1b528e60b2f65132a9ab64f2fd95b729ba (diff) | |
download | spark-b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557.tar.gz spark-b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557.tar.bz2 spark-b4d0db80a0bfba7f1e045d4edb9357b4b2c0a557.zip |
[SPARK-4873][Streaming] Use `Future.zip` instead of `Future.flatMap`(for-loop) in WriteAheadLogBasedBlockHandler
Use `Future.zip` instead of `Future.flatMap`(for-loop). `zip` implies these two Futures will run concurrently, while `flatMap` usually means one Future depends on the other one.
Author: zsxwing <zsxwing@gmail.com>
Closes #3721 from zsxwing/SPARK-4873 and squashes the following commits:
46a2cd9 [zsxwing] Use Future.zip instead of Future.flatMap(for-loop)
Diffstat (limited to 'streaming/src')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala | 5 |
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index c0670e22a7..8b97db8dd3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -187,10 +187,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( } // Combine the futures, wait for both to complete, and return the write ahead log segment - val combinedFuture = for { - _ <- storeInBlockManagerFuture - fileSegment <- storeInWriteAheadLogFuture - } yield fileSegment + val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) val segment = Await.result(combinedFuture, blockStoreTimeout) WriteAheadLogBasedStoreResult(blockId, segment) } |