aboutsummaryrefslogtreecommitdiff
path: root/launcher
diff options
context:
space:
mode:
Diffstat (limited to 'launcher')
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java33
1 files changed, 10 insertions, 23 deletions
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index dc8fbb58d8..13f72b757f 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -23,11 +23,11 @@ import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
import static org.apache.spark.launcher.LauncherProtocol.*;
@@ -69,44 +69,31 @@ public class LauncherServerSuite extends BaseSuite {
Socket s = new Socket(InetAddress.getLoopbackAddress(),
LauncherServer.getServerInstance().getPort());
- final Object waitLock = new Object();
+ final Semaphore semaphore = new Semaphore(0);
handle.addListener(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
- wakeUp();
+ semaphore.release();
}
-
@Override
public void infoChanged(SparkAppHandle handle) {
- wakeUp();
- }
-
- private void wakeUp() {
- synchronized (waitLock) {
- waitLock.notifyAll();
- }
+ semaphore.release();
}
});
client = new TestClient(s);
- synchronized (waitLock) {
- client.send(new Hello(handle.getSecret(), "1.4.0"));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new Hello(handle.getSecret(), "1.4.0"));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
// Make sure the server matched the client to the handle.
assertNotNull(handle.getConnection());
- synchronized (waitLock) {
- client.send(new SetAppId("app-id"));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new SetAppId("app-id"));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
assertEquals("app-id", handle.getAppId());
- synchronized (waitLock) {
- client.send(new SetState(SparkAppHandle.State.RUNNING));
- waitLock.wait(TimeUnit.SECONDS.toMillis(10));
- }
+ client.send(new SetState(SparkAppHandle.State.RUNNING));
+ semaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
assertEquals(SparkAppHandle.State.RUNNING, handle.getState());
handle.stop();