From 0e2405490f2056728d1353abbac6f3ea177ae533 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 16 Feb 2017 12:32:45 +0000 Subject: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support - Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen Closes #16871 from srowen/SPARK-19493. --- .../spark/launcher/AbstractCommandBuilder.java | 7 +- .../apache/spark/launcher/ChildProcAppHandle.java | 10 +- .../apache/spark/launcher/CommandBuilderUtils.java | 21 ----- .../org/apache/spark/launcher/LauncherServer.java | 7 +- .../apache/spark/launcher/OutputRedirector.java | 7 +- .../org/apache/spark/launcher/SparkAppHandle.java | 3 - .../spark/launcher/SparkClassCommandBuilder.java | 68 ++++++++------ .../spark/launcher/SparkSubmitCommandBuilder.java | 101 ++++++++++++--------- .../spark/launcher/CommandBuilderUtilsSuite.java | 36 -------- .../launcher/SparkSubmitCommandBuilderSuite.java | 8 +- launcher/src/test/resources/spark-defaults.conf | 2 +- 11 files changed, 103 insertions(+), 167 deletions(-) (limited to 'launcher') diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 0622fef17c..bc8d6037a3 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -104,15 +104,12 @@ abstract class AbstractCommandBuilder { // Load extra JAVA_OPTS from conf/java-opts, if it exists. File javaOpts = new File(join(File.separator, getConfDir(), "java-opts")); if (javaOpts.isFile()) { - BufferedReader br = new BufferedReader(new InputStreamReader( - new FileInputStream(javaOpts), StandardCharsets.UTF_8)); - try { + try (BufferedReader br = new BufferedReader(new InputStreamReader( + new FileInputStream(javaOpts), StandardCharsets.UTF_8))) { String line; while ((line = br.readLine()) != null) { addOptionString(cmd, line); } - } finally { - br.close(); } } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index c0779e1c4e..12bf29d3b1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,7 +18,6 @@ package org.apache.spark.launcher; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; @@ -103,14 +102,7 @@ class ChildProcAppHandle implements SparkAppHandle { try { childProc.exitValue(); } catch (IllegalThreadStateException e) { - // Child is still alive. Try to use Java 8's "destroyForcibly()" if available, - // fall back to the old API if it's not there. - try { - Method destroy = childProc.getClass().getMethod("destroyForcibly"); - destroy.invoke(childProc); - } catch (Exception inner) { - childProc.destroy(); - } + childProc.destroyForcibly(); } finally { childProc = null; } diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java index 250b2a882f..e14c8aa47d 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java +++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java @@ -312,27 +312,6 @@ class CommandBuilderUtils { return quoted.append('"').toString(); } - /** - * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't - * set it. - */ - static void addPermGenSizeOpt(List cmd) { - // Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later. - if (getJavaVendor() == JavaVendor.IBM) { - return; - } - if (javaMajorVersion(System.getProperty("java.version")) > 7) { - return; - } - for (String arg : cmd) { - if (arg.contains("-XX:MaxPermSize=")) { - return; - } - } - - cmd.add("-XX:MaxPermSize=256m"); - } - /** * Get the major version of the java version string supplied. This method * accepts any JEP-223-compliant strings (9-ea, 9+100), as well as legacy diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index ae43f563e8..865d4926da 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -137,12 +137,7 @@ class LauncherServer implements Closeable { this.server = server; this.running = true; - this.serverThread = factory.newThread(new Runnable() { - @Override - public void run() { - acceptConnections(); - } - }); + this.serverThread = factory.newThread(this::acceptConnections); serverThread.start(); } catch (IOException ioe) { close(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index c7959aee9f..ff8045390c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -44,12 +44,7 @@ class OutputRedirector { OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this.active = true; this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); - this.thread = tf.newThread(new Runnable() { - @Override - public void run() { - redirect(); - } - }); + this.thread = tf.newThread(this::redirect); this.sink = Logger.getLogger(loggerName); thread.start(); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index 0aa7bd197d..cefb4d1a95 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -91,9 +91,6 @@ public interface SparkAppHandle { * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send * a {@link #stop()} message to the application, so it's recommended that users first try to * stop the application cleanly and only resort to this method if that fails. - *

- * Note that if the application is running as a child process, this method fail to kill the - * process when using Java 7. This may happen if, for example, the application is deadlocked. */ void kill(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 82b593a3f7..81786841de 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -49,35 +49,44 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { // Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. - if (className.equals("org.apache.spark.deploy.master.Master")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_MASTER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.worker.Worker")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_WORKER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_HISTORY_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; - } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; - } else if (className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") || - className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; - } else { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - memKey = "SPARK_DRIVER_MEMORY"; + switch (className) { + case "org.apache.spark.deploy.master.Master": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_MASTER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.deploy.worker.Worker": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_WORKER_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.deploy.history.HistoryServer": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_HISTORY_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + case "org.apache.spark.executor.CoarseGrainedExecutorBackend": + javaOptsKeys.add("SPARK_JAVA_OPTS"); + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + break; + case "org.apache.spark.executor.MesosExecutorBackend": + javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); + memKey = "SPARK_EXECUTOR_MEMORY"; + break; + case "org.apache.spark.deploy.mesos.MesosClusterDispatcher": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + break; + case "org.apache.spark.deploy.ExternalShuffleService": + case "org.apache.spark.deploy.mesos.MesosExternalShuffleService": + javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); + memKey = "SPARK_DAEMON_MEMORY"; + break; + default: + javaOptsKeys.add("SPARK_JAVA_OPTS"); + memKey = "SPARK_DRIVER_MEMORY"; + break; } List cmd = buildJavaCommand(extraClassPath); @@ -94,7 +103,6 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); cmd.add("-Xmx" + mem); - addPermGenSizeOpt(cmd); cmd.add(className); cmd.addAll(classArgs); return cmd; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index 29c6d82cdb..5e64fa7ed1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -271,7 +271,6 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); } - addPermGenSizeOpt(cmd); cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); return cmd; @@ -405,49 +404,65 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { @Override protected boolean handle(String opt, String value) { - if (opt.equals(MASTER)) { - master = value; - } else if (opt.equals(DEPLOY_MODE)) { - deployMode = value; - } else if (opt.equals(PROPERTIES_FILE)) { - propertiesFile = value; - } else if (opt.equals(DRIVER_MEMORY)) { - conf.put(SparkLauncher.DRIVER_MEMORY, value); - } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { - conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); - } else if (opt.equals(DRIVER_LIBRARY_PATH)) { - conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); - } else if (opt.equals(DRIVER_CLASS_PATH)) { - conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); - } else if (opt.equals(CONF)) { - String[] setConf = value.split("=", 2); - checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); - conf.put(setConf[0], setConf[1]); - } else if (opt.equals(CLASS)) { - // The special classes require some special command line handling, since they allow - // mixing spark-submit arguments with arguments that should be propagated to the shell - // itself. Note that for this to work, the "--class" argument must come before any - // non-spark-submit arguments. - mainClass = value; - if (specialClasses.containsKey(value)) { - allowsMixedArguments = true; - appResource = specialClasses.get(value); - } - } else if (opt.equals(KILL_SUBMISSION) || opt.equals(STATUS)) { - isAppResourceReq = false; - sparkArgs.add(opt); - sparkArgs.add(value); - } else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) { - isAppResourceReq = false; - sparkArgs.add(opt); - } else if (opt.equals(VERSION)) { - isAppResourceReq = false; - sparkArgs.add(opt); - } else { - sparkArgs.add(opt); - if (value != null) { + switch (opt) { + case MASTER: + master = value; + break; + case DEPLOY_MODE: + deployMode = value; + break; + case PROPERTIES_FILE: + propertiesFile = value; + break; + case DRIVER_MEMORY: + conf.put(SparkLauncher.DRIVER_MEMORY, value); + break; + case DRIVER_JAVA_OPTIONS: + conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); + break; + case DRIVER_LIBRARY_PATH: + conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); + break; + case DRIVER_CLASS_PATH: + conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); + break; + case CONF: + String[] setConf = value.split("=", 2); + checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); + conf.put(setConf[0], setConf[1]); + break; + case CLASS: + // The special classes require some special command line handling, since they allow + // mixing spark-submit arguments with arguments that should be propagated to the shell + // itself. Note that for this to work, the "--class" argument must come before any + // non-spark-submit arguments. + mainClass = value; + if (specialClasses.containsKey(value)) { + allowsMixedArguments = true; + appResource = specialClasses.get(value); + } + break; + case KILL_SUBMISSION: + case STATUS: + isAppResourceReq = false; + sparkArgs.add(opt); sparkArgs.add(value); - } + break; + case HELP: + case USAGE_ERROR: + isAppResourceReq = false; + sparkArgs.add(opt); + break; + case VERSION: + isAppResourceReq = false; + sparkArgs.add(opt); + break; + default: + sparkArgs.add(opt); + if (value != null) { + sparkArgs.add(value); + } + break; } return true; } diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java index caeeea5ec6..9795041233 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java @@ -99,42 +99,6 @@ public class CommandBuilderUtilsSuite { assertEquals(10, javaMajorVersion("10")); } - @Test - public void testAddPermGenSizeOpt() { - List cmd = new ArrayList<>(); - - if (javaMajorVersion(System.getProperty("java.version")) > 7) { - // Does nothing in Java 8 - addPermGenSizeOpt(cmd); - assertEquals(0, cmd.size()); - cmd.clear(); - - } else { - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertTrue(cmd.get(0).startsWith("-XX:MaxPermSize=")); - cmd.clear(); - - cmd.add("foo"); - addPermGenSizeOpt(cmd); - assertEquals(2, cmd.size()); - assertTrue(cmd.get(1).startsWith("-XX:MaxPermSize=")); - cmd.clear(); - - cmd.add("-XX:MaxPermSize=512m"); - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertEquals("-XX:MaxPermSize=512m", cmd.get(0)); - cmd.clear(); - - cmd.add("'-XX:MaxPermSize=512m'"); - addPermGenSizeOpt(cmd); - assertEquals(1, cmd.size()); - assertEquals("'-XX:MaxPermSize=512m'", cmd.get(0)); - cmd.clear(); - } - } - private static void testOpt(String opts, List expected) { assertEquals(String.format("test string failed to parse: [[ %s ]]", opts), expected, parseOptionString(opts)); diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java index ad2e7a70c4..d569b6688d 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java @@ -233,7 +233,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath()); launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver"); - launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m"); + launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver"); launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native"); } else { launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home") @@ -258,12 +258,6 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite { assertFalse("Memory arguments should not be set.", found); } - for (String arg : cmd) { - if (arg.startsWith("-XX:MaxPermSize=")) { - assertEquals("-XX:MaxPermSize=256m", arg); - } - } - String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator)); if (isDriver) { assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp)); diff --git a/launcher/src/test/resources/spark-defaults.conf b/launcher/src/test/resources/spark-defaults.conf index 239fc57883..3a51208c7c 100644 --- a/launcher/src/test/resources/spark-defaults.conf +++ b/launcher/src/test/resources/spark-defaults.conf @@ -17,5 +17,5 @@ spark.driver.memory=1g spark.driver.extraClassPath=/driver -spark.driver.extraJavaOptions=-Ddriver -XX:MaxPermSize=256m +spark.driver.extraJavaOptions=-Ddriver spark.driver.extraLibraryPath=/native \ No newline at end of file -- cgit v1.2.3