aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/FutureActionSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/JobCancellationSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala43
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala25
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala4
10 files changed, 79 insertions, 70 deletions
diff --git a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala
index 1102aea96b..70b6309be7 100644
--- a/core/src/test/scala/org/apache/spark/FutureActionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FutureActionSuite.scala
@@ -17,11 +17,12 @@
package org.apache.spark
-import scala.concurrent.Await
import scala.concurrent.duration.Duration
import org.scalatest.{BeforeAndAfter, Matchers}
+import org.apache.spark.util.ThreadUtils
+
class FutureActionSuite
extends SparkFunSuite
@@ -36,7 +37,7 @@ class FutureActionSuite
test("simple async action") {
val rdd = sc.parallelize(1 to 10, 2)
val job = rdd.countAsync()
- val res = Await.result(job, Duration.Inf)
+ val res = ThreadUtils.awaitResult(job, Duration.Inf)
res should be (10)
job.jobIds.size should be (1)
}
@@ -44,7 +45,7 @@ class FutureActionSuite
test("complex async action") {
val rdd = sc.parallelize(1 to 15, 3)
val job = rdd.takeAsync(10)
- val res = Await.result(job, Duration.Inf)
+ val res = ThreadUtils.awaitResult(job, Duration.Inf)
res should be (1 to 10)
job.jobIds.size should be (2)
}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 713d5e58b4..4d2b3e7f3b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -21,7 +21,6 @@ import java.util.concurrent.{ExecutorService, TimeUnit}
import scala.collection.Map
import scala.collection.mutable
-import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -36,7 +35,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{ManualClock, ThreadUtils}
/**
* A test suite for the heartbeating behavior between the driver and the executors.
@@ -231,14 +230,14 @@ class HeartbeatReceiverSuite
private def addExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.addExecutor(executorId).map { f =>
- Await.result(f, 10.seconds)
+ ThreadUtils.awaitResult(f, 10.seconds)
} === Some(true))
}
private def removeExecutorAndVerify(executorId: String): Unit = {
assert(
heartbeatReceiver.removeExecutor(executorId).map { f =>
- Await.result(f, 10.seconds)
+ ThreadUtils.awaitResult(f, 10.seconds)
} === Some(true))
}
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index c347ab8dc8..a3490fc79e 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark
import java.util.concurrent.Semaphore
-import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.Future
@@ -28,6 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
+import org.apache.spark.util.ThreadUtils
/**
* Test suite for cancelling running jobs. We run the cancellation tasks for single job action
@@ -137,7 +137,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
- val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
+ val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, Duration.Inf) }.getCause
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
@@ -202,7 +202,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
- val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
+ val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 5.seconds) }.getCause
assert(e.getMessage contains "cancel")
// Once A is cancelled, job B should finish fairly quickly.
@@ -248,7 +248,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.countAsync()
Future { f.cancel() }
- val e = intercept[SparkException] { f.get() }
+ val e = intercept[SparkException] { f.get() }.getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
@@ -268,7 +268,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
sem.acquire()
f.cancel()
}
- val e = intercept[SparkException] { f.get() }
+ val e = intercept[SparkException] { f.get() }.getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
}
@@ -278,7 +278,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
{
val f = sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.takeAsync(5000)
Future { f.cancel() }
- val e = intercept[SparkException] { f.get() }
+ val e = intercept[SparkException] { f.get() }.getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
@@ -296,7 +296,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
sem.acquire()
f.cancel()
}
- val e = intercept[SparkException] { f.get() }
+ val e = intercept[SparkException] { f.get() }.getCause
assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 99d5b496bc..a1286523a2 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
-import scala.concurrent.{Await, ExecutionContext, Future}
+import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import org.mockito.Matchers.{any, anyLong}
@@ -33,6 +33,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.storage.{BlockId, BlockStatus, StorageLevel}
import org.apache.spark.storage.memory.MemoryStore
+import org.apache.spark.util.ThreadUtils
/**
@@ -172,15 +173,15 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
// Have both tasks request 500 bytes, then wait until both requests have been granted:
val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result1, futureTimeout) === 500L)
- assert(Await.result(t2Result1, futureTimeout) === 500L)
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 500L)
+ assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L)
// Have both tasks each request 500 bytes more; both should immediately return 0 as they are
// both now at 1 / N
val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result2, 200.millis) === 0L)
- assert(Await.result(t2Result2, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(t1Result2, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L)
}
test("two tasks cannot grow past 1 / N of on-heap execution memory") {
@@ -192,15 +193,15 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
// Have both tasks request 250 bytes, then wait until both requests have been granted:
val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result1, futureTimeout) === 250L)
- assert(Await.result(t2Result1, futureTimeout) === 250L)
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 250L)
+ assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 250L)
// Have both tasks each request 500 bytes more.
// We should only grant 250 bytes to each of them on this second request
val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result2, futureTimeout) === 250L)
- assert(Await.result(t2Result2, futureTimeout) === 250L)
+ assert(ThreadUtils.awaitResult(t1Result2, futureTimeout) === 250L)
+ assert(ThreadUtils.awaitResult(t2Result2, futureTimeout) === 250L)
}
test("tasks can block to get at least 1 / 2N of on-heap execution memory") {
@@ -211,17 +212,17 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result1, futureTimeout) === 1000L)
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null)
// The memory freed from t1 should now be granted to t2.
- assert(Await.result(t2Result1, futureTimeout) === 250L)
+ assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 250L)
// Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory.
val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t2Result2, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(t2Result2, 200.millis) === 0L)
}
test("TaskMemoryManager.cleanUpAllAllocatedMemory") {
@@ -232,18 +233,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result1, futureTimeout) === 1000L)
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 1000L)
val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
// t1 releases all of its memory, so t2 should be able to grab all of the memory
t1MemManager.cleanUpAllAllocatedMemory()
- assert(Await.result(t2Result1, futureTimeout) === 500L)
+ assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 500L)
val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t2Result2, futureTimeout) === 500L)
+ assert(ThreadUtils.awaitResult(t2Result2, futureTimeout) === 500L)
val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t2Result3, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(t2Result3, 200.millis) === 0L)
}
test("tasks should not be granted a negative amount of execution memory") {
@@ -254,13 +255,13 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
val futureTimeout: Duration = 20.seconds
val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result1, futureTimeout) === 700L)
+ assert(ThreadUtils.awaitResult(t1Result1, futureTimeout) === 700L)
val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t2Result1, futureTimeout) === 300L)
+ assert(ThreadUtils.awaitResult(t2Result1, futureTimeout) === 300L)
val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
- assert(Await.result(t1Result2, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(t1Result2, 200.millis) === 0L)
}
test("off-heap execution allocations cannot exceed limit") {
@@ -270,11 +271,11 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
val tMemManager = new TaskMemoryManager(memoryManager, 1)
val result1 = Future { tMemManager.acquireExecutionMemory(1000L, MemoryMode.OFF_HEAP, null) }
- assert(Await.result(result1, 200.millis) === 1000L)
+ assert(ThreadUtils.awaitResult(result1, 200.millis) === 1000L)
assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
val result2 = Future { tMemManager.acquireExecutionMemory(300L, MemoryMode.OFF_HEAP, null) }
- assert(Await.result(result2, 200.millis) === 0L)
+ assert(ThreadUtils.awaitResult(result2, 200.millis) === 0L)
assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
index d18bde790b..8cb0a295b0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
import org.apache.spark._
+import org.apache.spark.util.ThreadUtils
class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Timeouts {
@@ -185,22 +186,23 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
test("FutureAction result, infinite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
- assert(Await.result(f, Duration.Inf) === 100)
+ assert(ThreadUtils.awaitResult(f, Duration.Inf) === 100)
}
test("FutureAction result, finite wait") {
val f = sc.parallelize(1 to 100, 4)
.countAsync()
- assert(Await.result(f, Duration(30, "seconds")) === 100)
+ assert(ThreadUtils.awaitResult(f, Duration(30, "seconds")) === 100)
}
test("FutureAction result, timeout") {
val f = sc.parallelize(1 to 100, 4)
.mapPartitions(itr => { Thread.sleep(20); itr })
.countAsync()
- intercept[TimeoutException] {
- Await.result(f, Duration(20, "milliseconds"))
+ val e = intercept[SparkException] {
+ ThreadUtils.awaitResult(f, Duration(20, "milliseconds"))
}
+ assert(e.getCause.isInstanceOf[TimeoutException])
}
private def testAsyncAction[R](action: RDD[Int] => FutureAction[R]): Unit = {
@@ -221,7 +223,7 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim
// Now allow the executors to proceed with task processing.
starter.release(rdd.partitions.length)
// Waiting for the result verifies that the tasks were successfully processed.
- Await.result(executionContextInvoked.future, atMost = 15.seconds)
+ ThreadUtils.awaitResult(executionContextInvoked.future, atMost = 15.seconds)
}
test("SimpleFutureAction callback must not consume a thread while waiting") {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index cebac2097f..73803ec21a 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -35,7 +35,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Common tests for an RpcEnv implementation.
@@ -415,7 +415,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
})
val f = endpointRef.ask[String]("Hi")
- val ack = Await.result(f, 5 seconds)
+ val ack = ThreadUtils.awaitResult(f, 5 seconds)
assert("ack" === ack)
env.stop(endpointRef)
@@ -435,7 +435,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely")
try {
val f = rpcEndpointRef.ask[String]("hello")
- val ack = Await.result(f, 5 seconds)
+ val ack = ThreadUtils.awaitResult(f, 5 seconds)
assert("ack" === ack)
} finally {
anotherEnv.shutdown()
@@ -454,9 +454,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val f = endpointRef.ask[String]("Hi")
val e = intercept[SparkException] {
- Await.result(f, 5 seconds)
+ ThreadUtils.awaitResult(f, 5 seconds)
}
- assert("Oops" === e.getMessage)
+ assert("Oops" === e.getCause.getMessage)
env.stop(endpointRef)
}
@@ -476,9 +476,9 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
- Await.result(f, 5 seconds)
+ ThreadUtils.awaitResult(f, 5 seconds)
}
- assert("Oops" === e.getMessage)
+ assert("Oops" === e.getCause.getMessage)
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
@@ -487,6 +487,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
/**
* Setup an [[RpcEndpoint]] to collect all network events.
+ *
* @return the [[RpcEndpointRef]] and an `ConcurrentLinkedQueue` that contains network events.
*/
private def setupNetworkEndpoint(
@@ -620,10 +621,10 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
anotherEnv.setupEndpointRef(env.address, "sendWithReply-unserializable-error")
try {
val f = rpcEndpointRef.ask[String]("hello")
- val e = intercept[Exception] {
- Await.result(f, 1 seconds)
+ val e = intercept[SparkException] {
+ ThreadUtils.awaitResult(f, 1 seconds)
}
- assert(e.isInstanceOf[NotSerializableException])
+ assert(e.getCause.isInstanceOf[NotSerializableException])
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
@@ -754,15 +755,17 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
// RpcTimeout.awaitResult should have added the property to the TimeoutException message
assert(reply2.contains(shortTimeout.timeoutProp))
- // Ask with delayed response and allow the Future to timeout before Await.result
+ // Ask with delayed response and allow the Future to timeout before ThreadUtils.awaitResult
val fut3 = rpcEndpointRef.ask[String](NeverReply("goodbye"), shortTimeout)
+ // scalastyle:off awaitresult
// Allow future to complete with failure using plain Await.result, this will return
// once the future is complete to verify addMessageIfTimeout was invoked
val reply3 =
intercept[RpcTimeoutException] {
Await.result(fut3, 2000 millis)
}.getMessage
+ // scalastyle:on awaitresult
// When the future timed out, the recover callback should have used
// RpcTimeout.addMessageIfTimeout to add the property to the TimeoutException message
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index 994a58836b..2d6543d328 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rpc.netty
-import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark._
import org.apache.spark.rpc._
class NettyRpcEnvSuite extends RpcEnvSuite {
@@ -34,10 +34,11 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
test("non-existent endpoint") {
val uri = RpcEndpointAddress(env.address, "nonexist-endpoint").toString
- val e = intercept[RpcEndpointNotFoundException] {
+ val e = intercept[SparkException] {
env.setupEndpointRef(env.address, "nonexist-endpoint")
}
- assert(e.getMessage.contains(uri))
+ assert(e.getCause.isInstanceOf[RpcEndpointNotFoundException])
+ assert(e.getCause.getMessage.contains(uri))
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 8e509de767..83288db92b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.scheduler
import java.io.File
import java.util.concurrent.TimeoutException
-import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -33,7 +32,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark._
import org.apache.spark.rdd.{FakeOutputCommitter, RDD}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
/**
* Unit tests for the output commit coordination functionality.
@@ -159,9 +158,10 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
0 until rdd.partitions.size, resultHandler, () => Unit)
// It's an error if the job completes successfully even though no committer was authorized,
// so throw an exception if the job was allowed to complete.
- intercept[TimeoutException] {
- Await.result(futureAction, 5 seconds)
+ val e = intercept[SparkException] {
+ ThreadUtils.awaitResult(futureAction, 5 seconds)
}
+ assert(e.getCause.isInstanceOf[TimeoutException])
assert(tempDir.list().size === 0)
}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 9d1bd7ec89..9ee83b76e7 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterEach
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkException, SparkFunSuite, TaskContext, TaskContextImpl}
+import org.apache.spark.util.ThreadUtils
class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
@@ -124,8 +125,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
}
// After downgrading to a read lock, both threads should wake up and acquire the shared
// read lock.
- assert(!Await.result(lock1Future, 1.seconds))
- assert(!Await.result(lock2Future, 1.seconds))
+ assert(!ThreadUtils.awaitResult(lock1Future, 1.seconds))
+ assert(!ThreadUtils.awaitResult(lock2Future, 1.seconds))
assert(blockInfoManager.get("block").get.readerCount === 3)
}
@@ -161,7 +162,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(winningTID) {
blockInfoManager.unlock("block")
}
- assert(!Await.result(losingFuture, 1.seconds))
+ assert(!ThreadUtils.awaitResult(losingFuture, 1.seconds))
assert(blockInfoManager.get("block").get.readerCount === 1)
}
@@ -262,8 +263,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(0) {
blockInfoManager.unlock("block")
}
- assert(Await.result(get1Future, 1.seconds).isDefined)
- assert(Await.result(get2Future, 1.seconds).isDefined)
+ assert(ThreadUtils.awaitResult(get1Future, 1.seconds).isDefined)
+ assert(ThreadUtils.awaitResult(get2Future, 1.seconds).isDefined)
assert(blockInfoManager.get("block").get.readerCount === 2)
}
@@ -288,13 +289,14 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
blockInfoManager.unlock("block")
}
assert(
- Await.result(Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined)
+ ThreadUtils.awaitResult(
+ Future.firstCompletedOf(Seq(write1Future, write2Future)), 1.seconds).isDefined)
val firstWriteWinner = if (write1Future.isCompleted) 1 else 2
withTaskId(firstWriteWinner) {
blockInfoManager.unlock("block")
}
- assert(Await.result(write1Future, 1.seconds).isDefined)
- assert(Await.result(write2Future, 1.seconds).isDefined)
+ assert(ThreadUtils.awaitResult(write1Future, 1.seconds).isDefined)
+ assert(ThreadUtils.awaitResult(write2Future, 1.seconds).isDefined)
}
test("removing a non-existent block throws IllegalArgumentException") {
@@ -344,8 +346,8 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(0) {
blockInfoManager.removeBlock("block")
}
- assert(Await.result(getFuture, 1.seconds).isEmpty)
- assert(Await.result(writeFuture, 1.seconds).isEmpty)
+ assert(ThreadUtils.awaitResult(getFuture, 1.seconds).isEmpty)
+ assert(ThreadUtils.awaitResult(writeFuture, 1.seconds).isEmpty)
}
test("releaseAllLocksForTask releases write locks") {
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 6652a41b69..ae3b3d829f 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
-import scala.concurrent.{Await, Future}
+import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Random
@@ -109,7 +109,7 @@ class ThreadUtilsSuite extends SparkFunSuite {
val f = Future {
Thread.currentThread().getName()
}(ThreadUtils.sameThread)
- val futureThreadName = Await.result(f, 10.seconds)
+ val futureThreadName = ThreadUtils.awaitResult(f, 10.seconds)
assert(futureThreadName === callerThreadName)
}