aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297 /core/src/main
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/java/org/apache/spark/api/java/Optional.java7
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function0.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function2.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function3.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function4.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java1
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java9
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java28
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala44
28 files changed, 43 insertions, 71 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
index ca7babc3f0..fd0f495ca2 100644
--- a/core/src/main/java/org/apache/spark/api/java/Optional.java
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java;
import java.io.Serializable;
+import java.util.Objects;
import com.google.common.base.Preconditions;
@@ -52,8 +53,8 @@ import com.google.common.base.Preconditions;
* <li>{@link #isPresent()}</li>
* </ul>
*
- * <p>{@code java.util.Optional} itself is not used at this time because the
- * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * <p>{@code java.util.Optional} itself was not used because at the time, the
+ * project did not require Java 8. Using {@code com.google.common.base.Optional}
* has in the past caused serious library version conflicts with Guava that can't
* be resolved by shading. Hence this work-alike clone.</p>
*
@@ -171,7 +172,7 @@ public final class Optional<T> implements Serializable {
return false;
}
Optional<?> other = (Optional<?>) obj;
- return value == null ? other.value == null : value.equals(other.value);
+ return Objects.equals(value, other.value);
}
@Override
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 07aebb75e8..33bedf7ebc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values from 2
* Datasets.
*/
+@FunctionalInterface
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 576087b6f4..2f23da5bfe 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
+@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
Iterator<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
index bf16f791f9..3c0291cf46 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
+@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
index 462ca3f6f6..a6f69f7cdc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* If the function returns true, the element is included in the returned Dataset.
*/
+@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index 2d8ea6d1a5..91d61292f1 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
+@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index fc97b63f82..f9f2580b01 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
+@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index bae574ab57..6423c5d0fc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each grouping key and its values.
*/
+@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
index 07e54b28fa..2e6e90818d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* Spark will invoke the call function on each element in the input Dataset.
*/
+@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
index 4938a51bcd..d8f55d0ae1 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a function used in Dataset's foreachPartition function.
*/
+@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
void call(Iterator<T> t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index b9d9777a75..8b2bbd501c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
+@FunctionalInterface
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
index c86928dd05..5c649d9de4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A zero-argument function that returns an R.
*/
+@FunctionalInterface
public interface Function0<R> extends Serializable {
R call() throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index a975ce3c68..a7d9647095 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
+@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 6eecfb645a..77acd21d4e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
+@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index 9c35a22ca9..d530ba446b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R.
*/
+@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
index 3ae6ef4489..5efff943c8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for a map function used in Dataset's map function.
*/
+@FunctionalInterface
public interface MapFunction<T, U> extends Serializable {
U call(T value) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
index faa59eabc8..2c3d43afc0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a map function used in GroupedDataset's mapGroup function.
*/
+@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
R call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index cf9945a215..68e8557c88 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for function used in Dataset's mapPartitions.
*/
+@FunctionalInterface
public interface MapPartitionsFunction<T, U> extends Serializable {
Iterator<U> call(Iterator<T> input) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 51eed2e67b..97bd2b37a0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -26,6 +26,7 @@ import scala.Tuple2;
* A function that returns zero or more key-value pair records from each input record. The
* key-value pairs are represented as scala.Tuple2 objects.
*/
+@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
index 2fdfa7184a..34a7e4489a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -25,6 +25,7 @@ import scala.Tuple2;
* A function that returns key-value pairs (Tuple2&lt;K, V&gt;), and can be used to
* construct PairRDDs.
*/
+@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
index ee092d0058..d9029d8538 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for function used in Dataset's reduce.
*/
+@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
T call(T v1, T v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index f30d42ee57..aff2bc6e94 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function with no return value.
*/
+@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index da9ae1c9c5..ddb616241b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 with no return value.
*/
+@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
void call(T1 v1, T2 v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dcae4a34c4..189d607fa6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -162,14 +162,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(
- new TaskCompletionListener() {
- @Override
- public void onTaskCompletion(TaskContext context) {
- cleanupResources();
- }
- }
- );
+ taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
}
/**
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 01aed95878..cf4dfde86c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger {
private final PriorityQueue<UnsafeSorterIterator> priorityQueue;
UnsafeSorterSpillMerger(
- final RecordComparator recordComparator,
- final PrefixComparator prefixComparator,
- final int numSpills) {
- final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() {
-
- @Override
- public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) {
- final int prefixComparisonResult =
- prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
- if (prefixComparisonResult == 0) {
- return recordComparator.compare(
- left.getBaseObject(), left.getBaseOffset(),
- right.getBaseObject(), right.getBaseOffset());
- } else {
- return prefixComparisonResult;
- }
+ RecordComparator recordComparator,
+ PrefixComparator prefixComparator,
+ int numSpills) {
+ Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
+ int prefixComparisonResult =
+ prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
+ if (prefixComparisonResult == 0) {
+ return recordComparator.compare(
+ left.getBaseObject(), left.getBaseOffset(),
+ right.getBaseObject(), right.getBaseOffset());
+ } else {
+ return prefixComparisonResult;
}
};
priorityQueue = new PriorityQueue<>(numSpills, comparator);
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbab7b8844..7e564061e6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging {
private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
- if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) {
- logWarning("Support for Java 7 is deprecated as of Spark 2.0.0")
- }
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 31b9c5edf0..3fd812e9fc 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
cmd.add(s"-Xmx${memoryMb}M")
command.javaOpts.foreach(cmd.add)
- CommandBuilderUtils.addPermGenSizeOpt(cmd)
addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
cmd
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe6fe6aa4f..1e6e9a223e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging {
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
// Politely destroy first
process.destroy()
-
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
// Successful exit
Option(process.exitValue())
} else {
- // Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
- classOf[Process].getMethod("destroyForcibly").invoke(process)
+ process.destroyForcibly()
} catch {
- case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
@@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Wait for a process to terminate for at most the specified duration.
- *
- * @return whether the process actually terminated before the given timeout.
- */
- def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
- try {
- // Use Java 8 method if available
- classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
- .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
- .asInstanceOf[Boolean]
- } catch {
- case _: NoSuchMethodException =>
- // Otherwise implement it manually
- var terminated = false
- val startTime = System.currentTimeMillis
- while (!terminated) {
- try {
- process.exitValue()
- terminated = true
- } catch {
- case e: IllegalThreadStateException =>
- // Process not terminated yet
- if (System.currentTimeMillis - startTime > timeoutMs) {
- return false
- }
- Thread.sleep(100)
- }
- }
- true
- }
- }
-
- /**
* Return the stderr of a process after waiting for the process to terminate.
* If the process does not terminate within the specified timeout, return None.
*/
def getStderr(process: Process, timeoutMs: Long): Option[String] = {
- val terminated = Utils.waitForProcess(process, timeoutMs)
+ val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)
if (terminated) {
Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
} else {