aboutsummaryrefslogtreecommitdiff
path: root/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
blob: e3413fd6652d8f726eff58dc9571182b0d21f0b3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.launcher;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.apache.spark.launcher.LauncherProtocol.*;

/**
 * A server that listens locally for connections from client launched by the library. Each client
 * has a secret that it needs to send to the server to identify itself and establish the session.
 *
 * I/O is currently blocking (one thread per client). Clients have a limited time to connect back
 * to the server, otherwise the server will ignore the connection.
 *
 * === Architecture Overview ===
 *
 * The launcher server is used when Spark apps are launched as separate processes than the calling
 * app. It looks more or less like the following:
 *
 *         -----------------------                       -----------------------
 *         |      User App       |     spark-submit      |      Spark App      |
 *         |                     |  -------------------> |                     |
 *         |         ------------|                       |-------------        |
 *         |         |           |        hello          |            |        |
 *         |         | L. Server |<----------------------| L. Backend |        |
 *         |         |           |                       |            |        |
 *         |         -------------                       -----------------------
 *         |               |     |                              ^
 *         |               v     |                              |
 *         |        -------------|                              |
 *         |        |            |      <per-app channel>       |
 *         |        | App Handle |<------------------------------
 *         |        |            |
 *         -----------------------
 *
 * The server is started on demand and remains active while there are active or outstanding clients,
 * to avoid opening too many ports when multiple clients are launched. Each client is given a unique
 * secret, and have a limited amount of time to connect back
 * ({@link SparkLauncher#CHILD_CONNECTION_TIMEOUT}), at which point the server will throw away
 * that client's state. A client is only allowed to connect back to the server once.
 *
 * The launcher server listens on the localhost only, so it doesn't need access controls (aside from
 * the per-app secret) nor encryption. It thus requires that the launched app has a local process
 * that communicates with the server. In cluster mode, this means that the client that launches the
 * application must remain alive for the duration of the application (or until the app handle is
 * disconnected).
 */
class LauncherServer implements Closeable {

  private static final Logger LOG = Logger.getLogger(LauncherServer.class.getName());
  private static final String THREAD_NAME_FMT = "LauncherServer-%d";
  private static final long DEFAULT_CONNECT_TIMEOUT = 10000L;

  /** For creating secrets used for communication with child processes. */
  private static final SecureRandom RND = new SecureRandom();

  private static volatile LauncherServer serverInstance;

  /**
   * Creates a handle for an app to be launched. This method will start a server if one hasn't been
   * started yet. The server is shared for multiple handles, and once all handles are disposed of,
   * the server is shut down.
   */
  static synchronized ChildProcAppHandle newAppHandle() throws IOException {
    LauncherServer server = serverInstance != null ? serverInstance : new LauncherServer();
    server.ref();
    serverInstance = server;

    String secret = server.createSecret();
    while (server.pending.containsKey(secret)) {
      secret = server.createSecret();
    }

    return server.newAppHandle(secret);
  }

  static LauncherServer getServerInstance() {
    return serverInstance;
  }

  private final AtomicLong refCount;
  private final AtomicLong threadIds;
  private final ConcurrentMap<String, ChildProcAppHandle> pending;
  private final List<ServerConnection> clients;
  private final ServerSocket server;
  private final Thread serverThread;
  private final ThreadFactory factory;
  private final Timer timeoutTimer;

  private volatile boolean running;

  private LauncherServer() throws IOException {
    this.refCount = new AtomicLong(0);

    ServerSocket server = new ServerSocket();
    try {
      server.setReuseAddress(true);
      server.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));

      this.clients = new ArrayList<>();
      this.threadIds = new AtomicLong();
      this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
      this.pending = new ConcurrentHashMap<>();
      this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
      this.server = server;
      this.running = true;

      this.serverThread = factory.newThread(new Runnable() {
        @Override
        public void run() {
          acceptConnections();
        }
      });
      serverThread.start();
    } catch (IOException ioe) {
      close();
      throw ioe;
    } catch (Exception e) {
      close();
      throw new IOException(e);
    }
  }

  /**
   * Creates a new app handle. The handle will wait for an incoming connection for a configurable
   * amount of time, and if one doesn't arrive, it will transition to an error state.
   */
  ChildProcAppHandle newAppHandle(String secret) {
    ChildProcAppHandle handle = new ChildProcAppHandle(secret, this);
    ChildProcAppHandle existing = pending.putIfAbsent(secret, handle);
    CommandBuilderUtils.checkState(existing == null, "Multiple handles with the same secret.");
    return handle;
  }

  @Override
  public void close() throws IOException {
    synchronized (this) {
      if (running) {
        running = false;
        timeoutTimer.cancel();
        server.close();
        synchronized (clients) {
          List<ServerConnection> copy = new ArrayList<>(clients);
          clients.clear();
          for (ServerConnection client : copy) {
            client.close();
          }
        }
      }
    }
    if (serverThread != null) {
      try {
        serverThread.join();
      } catch (InterruptedException ie) {
        // no-op
      }
    }
  }

  void ref() {
    refCount.incrementAndGet();
  }

  void unref() {
    synchronized(LauncherServer.class) {
      if (refCount.decrementAndGet() == 0) {
        try {
          close();
        } catch (IOException ioe) {
          // no-op.
        } finally {
          serverInstance = null;
        }
      }
    }
  }

  int getPort() {
    return server.getLocalPort();
  }

  /**
   * Removes the client handle from the pending list (in case it's still there), and unrefs
   * the server.
   */
  void unregister(ChildProcAppHandle handle) {
    pending.remove(handle.getSecret());
    unref();
  }

  private void acceptConnections() {
    try {
      while (running) {
        final Socket client = server.accept();
        TimerTask timeout = new TimerTask() {
          @Override
          public void run() {
            LOG.warning("Timed out waiting for hello message from client.");
            try {
              client.close();
            } catch (IOException ioe) {
              // no-op.
            }
          }
        };
        ServerConnection clientConnection = new ServerConnection(client, timeout);
        Thread clientThread = factory.newThread(clientConnection);
        synchronized (timeout) {
          clientThread.start();
          synchronized (clients) {
            clients.add(clientConnection);
          }
          long timeoutMs = getConnectionTimeout();
          // 0 is used for testing to avoid issues with clock resolution / thread scheduling,
          // and force an immediate timeout.
          if (timeoutMs > 0) {
            timeoutTimer.schedule(timeout, getConnectionTimeout());
          } else {
            timeout.run();
          }
        }
      }
    } catch (IOException ioe) {
      if (running) {
        LOG.log(Level.SEVERE, "Error in accept loop.", ioe);
      }
    }
  }

  private long getConnectionTimeout() {
    String value = SparkLauncher.launcherConfig.get(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
    return (value != null) ? Long.parseLong(value) : DEFAULT_CONNECT_TIMEOUT;
  }

  private String createSecret() {
    byte[] secret = new byte[128];
    RND.nextBytes(secret);

    StringBuilder sb = new StringBuilder();
    for (byte b : secret) {
      int ival = b >= 0 ? b : Byte.MAX_VALUE - b;
      if (ival < 0x10) {
        sb.append("0");
      }
      sb.append(Integer.toHexString(ival));
    }
    return sb.toString();
  }

  private class ServerConnection extends LauncherConnection {

    private TimerTask timeout;
    private ChildProcAppHandle handle;

    ServerConnection(Socket socket, TimerTask timeout) throws IOException {
      super(socket);
      this.timeout = timeout;
    }

    @Override
    protected void handle(Message msg) throws IOException {
      try {
        if (msg instanceof Hello) {
          timeout.cancel();
          timeout = null;
          Hello hello = (Hello) msg;
          ChildProcAppHandle handle = pending.remove(hello.secret);
          if (handle != null) {
            handle.setConnection(this);
            handle.setState(SparkAppHandle.State.CONNECTED);
            this.handle = handle;
          } else {
            throw new IllegalArgumentException("Received Hello for unknown client.");
          }
        } else {
          if (handle == null) {
            throw new IllegalArgumentException("Expected hello, got: " +
            msg != null ? msg.getClass().getName() : null);
          }
          if (msg instanceof SetAppId) {
            SetAppId set = (SetAppId) msg;
            handle.setAppId(set.appId);
          } else if (msg instanceof SetState) {
            handle.setState(((SetState)msg).state);
          } else {
            throw new IllegalArgumentException("Invalid message: " +
              msg != null ? msg.getClass().getName() : null);
          }
        }
      } catch (Exception e) {
        LOG.log(Level.INFO, "Error handling message from client.", e);
        if (timeout != null) {
          timeout.cancel();
        }
        close();
      } finally {
        timeoutTimer.purge();
      }
    }

    @Override
    public void close() throws IOException {
      synchronized (clients) {
        clients.remove(this);
      }
      super.close();
      if (handle != null) {
        handle.disconnect();
      }
    }

  }

}