diff options
Diffstat (limited to 'core/src/test/scala/org/apache')
10 files changed, 79 insertions, 70 deletions
diff --git a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala index 1102aea96b..70b6309be7 100644 --- a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala +++ b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark -import scala.concurrent.Await import scala.concurrent.duration.Duration import org.scalatest.{BeforeAndAfter, Matchers} +import org.apache.spark.util.ThreadUtils + class FutureActionSuite extends SparkFunSuite @@ -36,7 +37,7 @@ class FutureActionSuite test("simple async action") { val rdd = sc.parallelize(1 to 10, 2) val job = rdd.countAsync() - val res = Await.result(job, Duration.Inf) + val res = ThreadUtils.awaitResult(job, Duration.Inf) res should be (10) job.jobIds.size should be (1) } @@ -44,7 +45,7 @@ class FutureActionSuite test("complex async action") { val rdd = sc.parallelize(1 to 15, 3) val job = rdd.takeAsync(10) - val res = Await.result(job, Duration.Inf) + val res = ThreadUtils.awaitResult(job, Duration.Inf) res should be (1 to 10) job.jobIds.size should be (2) } diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 713d5e58b4..4d2b3e7f3b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -21,7 +21,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit} import scala.collection.Map import scala.collection.mutable -import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -36,7 +35,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{ManualClock, ThreadUtils} /** * A test suite for the heartbeating behavior between the driver and the executors. @@ -231,14 +230,14 @@ class HeartbeatReceiverSuite private def addExecutorAndVerify(executorId: String): Unit = { assert( heartbeatReceiver.addExecutor(executorId).map { f => - Await.result(f, 10.seconds) + ThreadUtils.awaitResult(f, 10.seconds) } === Some(true)) } private def removeExecutorAndVerify(executorId: String): Unit = { assert( heartbeatReceiver.removeExecutor(executorId).map { f => - Await.result(f, 10.seconds) + ThreadUtils.awaitResult(f, 10.seconds) } === Some(true)) } diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index c347ab8dc8..a3490fc79e 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark import java.util.concurrent.Semaphore -import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.Future @@ -28,6 +27,7 @@ import org.scalatest.BeforeAndAfter import org.scalatest.Matchers import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} +import org.apache.spark.util.ThreadUtils /** * Test suite for cancelling running jobs. We run the cancellation tasks for single job action @@ -137,7 +137,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft sc.clearJobGroup() val jobB = sc.parallelize(1 to 100, 2).countAsync() sc.cancelJobGroup("jobA") - val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) } + val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, Duration.Inf) }.getCause assert(e.getMessage contains "cancel") // Once A is cancelled, job B should finish fairly quickly. @@ -202,7 +202,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft sc.clearJobGroup() val jobB = sc.parallelize(1 to 100, 2).countAsync() sc.cancelJobGroup("jobA") - val e = intercept[SparkException] { Await.result(jobA, 5.seconds) } + val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 5.seconds) }.getCause assert(e.getMessage contains "cancel") // Once A is cancelled, job B should finish fairly quickly. @@ -248,7 +248,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync() Future { f.cancel() } - val e = intercept[SparkException] { f.get() } + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -268,7 +268,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft sem.acquire() f.cancel() } - val e = intercept[SparkException] { f.get() } + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } } @@ -278,7 +278,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft { val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000) Future { f.cancel() } - val e = intercept[SparkException] { f.get() } + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } @@ -296,7 +296,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft sem.acquire() f.cancel() } - val e = intercept[SparkException] { f.get() } + val e = intercept[SparkException] { f.get() }.getCause assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed")) } } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 99d5b496bc..a1286523a2 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.memory import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable -import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import org.mockito.Matchers.{any, anyLong} @@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel} import org.apache.spark.storage.memory.MemoryStore +import org.apache.spark.util.ThreadUtils /** @@ -172,15 +173,15 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft // Have both tasks request 500 bytes, then wait until both requests have been granted: val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result1, futureTimeout) === 500L) - assert(Await.result(t2Result1, futureTimeout) === 500L) + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 500L) + assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L) // Have both tasks each request 500 bytes more; both should immediately return 0 as they are // both now at 1 / N val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result2, 200.millis) === 0L) - assert(Await.result(t2Result2, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(t1Result2, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L) } test("two tasks cannot grow past 1 / N of on-heap execution memory") { @@ -192,15 +193,15 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft // Have both tasks request 250 bytes, then wait until both requests have been granted: val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result1, futureTimeout) === 250L) - assert(Await.result(t2Result1, futureTimeout) === 250L) + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 250L) + assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 250L) // Have both tasks each request 500 bytes more. // We should only grant 250 bytes to each of them on this second request val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result2, futureTimeout) === 250L) - assert(Await.result(t2Result2, futureTimeout) === 250L) + assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 250L) + assert(ThreadUtils.awaitResult(t2Result2, futureTimeout) === 250L) } test("tasks can block to get at least 1 / 2N of on-heap execution memory") { @@ -211,17 +212,17 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result1, futureTimeout) === 1000L) + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L) val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult // to make sure the other thread blocks for some time otherwise. Thread.sleep(300) t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null) // The memory freed from t1 should now be granted to t2. - assert(Await.result(t2Result1, futureTimeout) === 250L) + assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 250L) // Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory. val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t2Result2, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L) } test("TaskMemoryManager.cleanUpAllAllocatedMemory") { @@ -232,18 +233,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result1, futureTimeout) === 1000L) + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L) val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult // to make sure the other thread blocks for some time otherwise. Thread.sleep(300) // t1 releases all of its memory, so t2 should be able to grab all of the memory t1MemManager.cleanUpAllAllocatedMemory() - assert(Await.result(t2Result1, futureTimeout) === 500L) + assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L) val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t2Result2, futureTimeout) === 500L) + assert(ThreadUtils.awaitResult(t2Result2, futureTimeout) === 500L) val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t2Result3, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(t2Result3, 200.millis) === 0L) } test("tasks should not be granted a negative amount of execution memory") { @@ -254,13 +255,13 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft val futureTimeout: Duration = 20.seconds val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result1, futureTimeout) === 700L) + assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 700L) val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t2Result1, futureTimeout) === 300L) + assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 300L) val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) } - assert(Await.result(t1Result2, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(t1Result2, 200.millis) === 0L) } test("off-heap execution allocations cannot exceed limit") { @@ -270,11 +271,11 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft val tMemManager = new TaskMemoryManager(memoryManager, 1) val result1 = Future { tMemManager.acquireExecutionMemory(1000L, MemoryMode.OFF_HEAP, null) } - assert(Await.result(result1, 200.millis) === 1000L) + assert(ThreadUtils.awaitResult(result1, 200.millis) === 1000L) assert(tMemManager.getMemoryConsumptionForThisTask === 1000L) val result2 = Future { tMemManager.acquireExecutionMemory(300L, MemoryMode.OFF_HEAP, null) } - assert(Await.result(result2, 200.millis) === 0L) + assert(ThreadUtils.awaitResult(result2, 200.millis) === 0L) assert(tMemManager.getMemoryConsumptionForThisTask === 1000L) tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null) diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index d18bde790b..8cb0a295b0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.util.ThreadUtils class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts { @@ -185,22 +186,23 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim test("FutureAction result, infinite wait") { val f = sc.parallelize(1 to 100, 4) .countAsync() - assert(Await.result(f, Duration.Inf) === 100) + assert(ThreadUtils.awaitResult(f, Duration.Inf) === 100) } test("FutureAction result, finite wait") { val f = sc.parallelize(1 to 100, 4) .countAsync() - assert(Await.result(f, Duration(30, "seconds")) === 100) + assert(ThreadUtils.awaitResult(f, Duration(30, "seconds")) === 100) } test("FutureAction result, timeout") { val f = sc.parallelize(1 to 100, 4) .mapPartitions(itr => { Thread.sleep(20); itr }) .countAsync() - intercept[TimeoutException] { - Await.result(f, Duration(20, "milliseconds")) + val e = intercept[SparkException] { + ThreadUtils.awaitResult(f, Duration(20, "milliseconds")) } + assert(e.getCause.isInstanceOf[TimeoutException]) } private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = { @@ -221,7 +223,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim // Now allow the executors to proceed with task processing. starter.release(rdd.partitions.length) // Waiting for the result verifies that the tasks were successfully processed. - Await.result(executionContextInvoked.future, atMost = 15.seconds) + ThreadUtils.awaitResult(executionContextInvoked.future, atMost = 15.seconds) } test("SimpleFutureAction callback must not consume a thread while waiting") { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index cebac2097f..73803ec21a 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -35,7 +35,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Common tests for an RpcEnv implementation. @@ -415,7 +415,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { }) val f = endpointRef.ask[String]("Hi") - val ack = Await.result(f, 5 seconds) + val ack = ThreadUtils.awaitResult(f, 5 seconds) assert("ack" === ack) env.stop(endpointRef) @@ -435,7 +435,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely") try { val f = rpcEndpointRef.ask[String]("hello") - val ack = Await.result(f, 5 seconds) + val ack = ThreadUtils.awaitResult(f, 5 seconds) assert("ack" === ack) } finally { anotherEnv.shutdown() @@ -454,9 +454,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val f = endpointRef.ask[String]("Hi") val e = intercept[SparkException] { - Await.result(f, 5 seconds) + ThreadUtils.awaitResult(f, 5 seconds) } - assert("Oops" === e.getMessage) + assert("Oops" === e.getCause.getMessage) env.stop(endpointRef) } @@ -476,9 +476,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { try { val f = rpcEndpointRef.ask[String]("hello") val e = intercept[SparkException] { - Await.result(f, 5 seconds) + ThreadUtils.awaitResult(f, 5 seconds) } - assert("Oops" === e.getMessage) + assert("Oops" === e.getCause.getMessage) } finally { anotherEnv.shutdown() anotherEnv.awaitTermination() @@ -487,6 +487,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { /** * Setup an [[RpcEndpoint]] to collect all network events. + * * @return the [[RpcEndpointRef]] and an `ConcurrentLinkedQueue` that contains network events. */ private def setupNetworkEndpoint( @@ -620,10 +621,10 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { anotherEnv.setupEndpointRef(env.address, "sendWithReply-unserializable-error") try { val f = rpcEndpointRef.ask[String]("hello") - val e = intercept[Exception] { - Await.result(f, 1 seconds) + val e = intercept[SparkException] { + ThreadUtils.awaitResult(f, 1 seconds) } - assert(e.isInstanceOf[NotSerializableException]) + assert(e.getCause.isInstanceOf[NotSerializableException]) } finally { anotherEnv.shutdown() anotherEnv.awaitTermination() @@ -754,15 +755,17 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { // RpcTimeout.awaitResult should have added the property to the TimeoutException message assert(reply2.contains(shortTimeout.timeoutProp)) - // Ask with delayed response and allow the Future to timeout before Await.result + // Ask with delayed response and allow the Future to timeout before ThreadUtils.awaitResult val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout) + // scalastyle:off awaitresult // Allow future to complete with failure using plain Await.result, this will return // once the future is complete to verify addMessageIfTimeout was invoked val reply3 = intercept[RpcTimeoutException] { Await.result(fut3, 2000 millis) }.getMessage + // scalastyle:on awaitresult // When the future timed out, the recover callback should have used // RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala index 994a58836b..2d6543d328 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.rpc.netty -import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark._ import org.apache.spark.rpc._ class NettyRpcEnvSuite extends RpcEnvSuite { @@ -34,10 +34,11 @@ class NettyRpcEnvSuite extends RpcEnvSuite { test("non-existent endpoint") { val uri = RpcEndpointAddress(env.address, "nonexist-endpoint").toString - val e = intercept[RpcEndpointNotFoundException] { + val e = intercept[SparkException] { env.setupEndpointRef(env.address, "nonexist-endpoint") } - assert(e.getMessage.contains(uri)) + assert(e.getCause.isInstanceOf[RpcEndpointNotFoundException]) + assert(e.getCause.getMessage.contains(uri)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala index 8e509de767..83288db92b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler import java.io.File import java.util.concurrent.TimeoutException -import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps @@ -33,7 +32,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark._ import org.apache.spark.rdd.{FakeOutputCommitter, RDD} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Unit tests for the output commit coordination functionality. @@ -159,9 +158,10 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter { 0 until rdd.partitions.size, resultHandler, () => Unit) // It's an error if the job completes successfully even though no committer was authorized, // so throw an exception if the job was allowed to complete. - intercept[TimeoutException] { - Await.result(futureAction, 5 seconds) + val e = intercept[SparkException] { + ThreadUtils.awaitResult(futureAction, 5 seconds) } + assert(e.getCause.isInstanceOf[TimeoutException]) assert(tempDir.list().size === 0) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala index 9d1bd7ec89..9ee83b76e7 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterEach import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkException, SparkFunSuite, TaskContext, TaskContextImpl} +import org.apache.spark.util.ThreadUtils class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { @@ -124,8 +125,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { } // After downgrading to a read lock, both threads should wake up and acquire the shared // read lock. - assert(!Await.result(lock1Future, 1.seconds)) - assert(!Await.result(lock2Future, 1.seconds)) + assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds)) + assert(!ThreadUtils.awaitResult(lock2Future, 1.seconds)) assert(blockInfoManager.get("block").get.readerCount === 3) } @@ -161,7 +162,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { withTaskId(winningTID) { blockInfoManager.unlock("block") } - assert(!Await.result(losingFuture, 1.seconds)) + assert(!ThreadUtils.awaitResult(losingFuture, 1.seconds)) assert(blockInfoManager.get("block").get.readerCount === 1) } @@ -262,8 +263,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { withTaskId(0) { blockInfoManager.unlock("block") } - assert(Await.result(get1Future, 1.seconds).isDefined) - assert(Await.result(get2Future, 1.seconds).isDefined) + assert(ThreadUtils.awaitResult(get1Future, 1.seconds).isDefined) + assert(ThreadUtils.awaitResult(get2Future, 1.seconds).isDefined) assert(blockInfoManager.get("block").get.readerCount === 2) } @@ -288,13 +289,14 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { blockInfoManager.unlock("block") } assert( - Await.result(Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined) + ThreadUtils.awaitResult( + Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined) val firstWriteWinner = if (write1Future.isCompleted) 1 else 2 withTaskId(firstWriteWinner) { blockInfoManager.unlock("block") } - assert(Await.result(write1Future, 1.seconds).isDefined) - assert(Await.result(write2Future, 1.seconds).isDefined) + assert(ThreadUtils.awaitResult(write1Future, 1.seconds).isDefined) + assert(ThreadUtils.awaitResult(write2Future, 1.seconds).isDefined) } test("removing a non-existent block throws IllegalArgumentException") { @@ -344,8 +346,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach { withTaskId(0) { blockInfoManager.removeBlock("block") } - assert(Await.result(getFuture, 1.seconds).isEmpty) - assert(Await.result(writeFuture, 1.seconds).isEmpty) + assert(ThreadUtils.awaitResult(getFuture, 1.seconds).isEmpty) + assert(ThreadUtils.awaitResult(writeFuture, 1.seconds).isEmpty) } test("releaseAllLocksForTask releases write locks") { diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala index 6652a41b69..ae3b3d829f 100644 --- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import java.util.concurrent.{CountDownLatch, TimeUnit} -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Random @@ -109,7 +109,7 @@ class ThreadUtilsSuite extends SparkFunSuite { val f = Future { Thread.currentThread().getName() }(ThreadUtils.sameThread) - val futureThreadName = Await.result(f, 10.seconds) + val futureThreadName = ThreadUtils.awaitResult(f, 10.seconds) assert(futureThreadName === callerThreadName) } |