aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala14
1 files changed, 10 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 99c8d13231..eb6e88cf55 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.nio.ByteBuffer
+import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
@@ -36,6 +37,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
+ val executorStarted = new Semaphore(0)
assert(executor.isAllEmpty)
@@ -43,6 +45,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val executingThread = new Thread() {
override def run() {
executor.start()
+ executorStarted.release(1)
executor.awaitTermination()
}
}
@@ -57,6 +60,9 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
}
+ // Ensure executor is started
+ executorStarted.acquire()
+
// Verify that receiver was started
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
@@ -186,10 +192,10 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
- var otherThread: Thread = null
- var receiving = false
- var onStartCalled = false
- var onStopCalled = false
+ @volatile var otherThread: Thread = null
+ @volatile var receiving = false
+ @volatile var onStartCalled = false
+ @volatile var onStopCalled = false
def onStart() {
otherThread = new Thread() {