diff options
Diffstat (limited to 'streaming')
3 files changed, 9 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index f381fa4094..a7d870500f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.receiver -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.language.{existentials, postfixOps} import scala.util.control.NonFatal @@ -213,7 +213,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Combine the futures, wait for both to complete, and return the write ahead log record handle val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) - val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) + val walRecordHandle = ThreadUtils.awaitResult(combinedFuture, blockStoreTimeout) WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala index 165e81ea41..71f3304f1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala @@ -23,14 +23,14 @@ import java.util.concurrent.LinkedBlockingQueue import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{Await, Promise} +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A wrapper for a WriteAheadLog that batches records before writing data. Handles aggregation @@ -80,7 +80,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp } } if (putSuccessfully) { - Await.result(promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds) + ThreadUtils.awaitResult( + promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds) } else { throw new IllegalStateException("close() was called on BatchedWriteAheadLog before " + s"write request with time $time could be fulfilled.") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 8c980dee2c..24cb5afee3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -38,7 +38,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually._ import org.scalatest.mock.MockitoSugar -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.streaming.scheduler._ import org.apache.spark.util.{CompletionIterator, ManualClock, ThreadUtils, Utils} @@ -471,10 +471,11 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests( // the BatchedWriteAheadLog should bubble up any exceptions that may have happened during writes val batchedWal = new BatchedWriteAheadLog(wal, sparkConf) - intercept[RuntimeException] { + val e = intercept[SparkException] { val buffer = mock[ByteBuffer] batchedWal.write(buffer, 2L) } + assert(e.getCause.getMessage === "Hello!") } // we make the write requests in separate threads so that we don't block the test thread |