aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala5
1 files changed, 1 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index c0670e22a7..8b97db8dd3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -187,10 +187,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
}
// Combine the futures, wait for both to complete, and return the write ahead log segment
- val combinedFuture = for {
- _ <- storeInBlockManagerFuture
- fileSegment <- storeInWriteAheadLogFuture
- } yield fileSegment
+ val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val segment = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, segment)
}