aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /core/src/main/scala/org
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'core/src/main/scala/org')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala44
3 files changed, 4 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbab7b8844..7e564061e6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging {
private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
- if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) {
- logWarning("Support for Java 7 is deprecated as of Spark 2.0.0")
- }
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 31b9c5edf0..3fd812e9fc 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
cmd.add(s"-Xmx${memoryMb}M")
command.javaOpts.foreach(cmd.add)
- CommandBuilderUtils.addPermGenSizeOpt(cmd)
addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
cmd
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe6fe6aa4f..1e6e9a223e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging {
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
// Politely destroy first
process.destroy()
-
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
// Successful exit
Option(process.exitValue())
} else {
- // Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
- classOf[Process].getMethod("destroyForcibly").invoke(process)
+ process.destroyForcibly()
} catch {
- case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
@@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Wait for a process to terminate for at most the specified duration.
- *
- * @return whether the process actually terminated before the given timeout.
- */
- def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
- try {
- // Use Java 8 method if available
- classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
- .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
- .asInstanceOf[Boolean]
- } catch {
- case _: NoSuchMethodException =>
- // Otherwise implement it manually
- var terminated = false
- val startTime = System.currentTimeMillis
- while (!terminated) {
- try {
- process.exitValue()
- terminated = true
- } catch {
- case e: IllegalThreadStateException =>
- // Process not terminated yet
- if (System.currentTimeMillis - startTime > timeoutMs) {
- return false
- }
- Thread.sleep(100)
- }
- }
- true
- }
- }
-
- /**
* Return the stderr of a process after waiting for the process to terminate.
* If the process does not terminate within the specified timeout, return None.
*/
def getStderr(process: Process, timeoutMs: Long): Option[String] = {
- val terminated = Utils.waitForProcess(process, timeoutMs)
+ val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)
if (terminated) {
Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
} else {