aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala7
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala5
3 files changed, 9 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index f381fa4094..a7d870500f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.receiver
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.language.{existentials, postfixOps}
import scala.util.control.NonFatal
@@ -213,7 +213,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
- val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
+ val walRecordHandle = ThreadUtils.awaitResult(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
index 165e81ea41..71f3304f1b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/BatchedWriteAheadLog.scala
@@ -23,14 +23,14 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* A wrapper for a WriteAheadLog that batches records before writing data. Handles aggregation
@@ -80,7 +80,8 @@ private[util] class BatchedWriteAheadLog(val wrappedLog: WriteAheadLog, conf: Sp
}
}
if (putSuccessfully) {
- Await.result(promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds)
+ ThreadUtils.awaitResult(
+ promise.future, WriteAheadLogUtils.getBatchingTimeout(conf).milliseconds)
} else {
throw new IllegalStateException("close() was called on BatchedWriteAheadLog before " +
s"write request with time $time could be fulfilled.")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8c980dee2c..24cb5afee3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -38,7 +38,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.Eventually._
import org.scalatest.mock.MockitoSugar
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{CompletionIterator, ManualClock, ThreadUtils, Utils}
@@ -471,10 +471,11 @@ class BatchedWriteAheadLogSuite extends CommonWriteAheadLogTests(
// the BatchedWriteAheadLog should bubble up any exceptions that may have happened during writes
val batchedWal = new BatchedWriteAheadLog(wal, sparkConf)
- intercept[RuntimeException] {
+ val e = intercept[SparkException] {
val buffer = mock[ByteBuffer]
batchedWal.write(buffer, 2L)
}
+ assert(e.getCause.getMessage === "Hello!")
}
// we make the write requests in separate threads so that we don't block the test thread