aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
committerSean Owen <sowen@cloudera.com>2017-02-16 12:32:45 +0000
commit0e2405490f2056728d1353abbac6f3ea177ae533 (patch)
tree1a9ec960faec7abcb8d8fbac43b6a6dc633d2297
parent3871d94a695d47169720e877f77ff1e4bede43ee (diff)
downloadspark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.gz
spark-0e2405490f2056728d1353abbac6f3ea177ae533.tar.bz2
spark-0e2405490f2056728d1353abbac6f3ea177ae533.zip
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
- Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean Owen <sowen@cloudera.com> Closes #16871 from srowen/SPARK-19493.
-rw-r--r--assembly/pom.xml1
-rwxr-xr-xbuild/mvn8
-rwxr-xr-xbuild/sbt-launch-lib.bash2
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java111
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java16
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java3
-rw-r--r--common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java27
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java2
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java8
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java7
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java21
-rw-r--r--common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java9
-rw-r--r--common/sketch/pom.xml2
-rw-r--r--common/unsafe/pom.xml2
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java9
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java88
-rw-r--r--core/src/main/java/org/apache/spark/api/java/Optional.java7
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function0.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function2.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function3.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/Function4.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/PairFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java1
-rw-r--r--core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java1
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java9
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java28
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala44
-rw-r--r--core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java (renamed from external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java)2
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaAPISuite.java (renamed from core/src/test/java/org/apache/spark/JavaAPISuite.java)8
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala6
-rw-r--r--dev/appveyor-install-dependencies.ps12
-rwxr-xr-xdev/create-release/release-build.sh1
-rwxr-xr-xdev/make-distribution.sh2
-rwxr-xr-xdev/mima1
-rwxr-xr-xdev/run-tests.py3
-rwxr-xr-xdev/test-dependencies.sh2
-rw-r--r--docs/building-spark.md32
-rw-r--r--docs/index.md6
-rw-r--r--docs/mllib-linear-methods.md2
-rw-r--r--docs/mllib-statistics.md7
-rw-r--r--docs/programming-guide.md11
-rw-r--r--docs/quick-start.md9
-rw-r--r--docs/streaming-custom-receivers.md10
-rw-r--r--docs/streaming-kafka-0-10-integration.md62
-rw-r--r--docs/streaming-kafka-0-8-integration.md41
-rw-r--r--docs/streaming-programming-guide.md219
-rw-r--r--docs/structured-streaming-programming-guide.md38
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java2
-rw-r--r--external/java8-tests/README.md22
-rw-r--r--external/java8-tests/pom.xml132
-rw-r--r--external/java8-tests/src/test/resources/log4j.properties27
-rw-r--r--external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala30
-rw-r--r--external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala3
-rw-r--r--external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala7
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java7
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java10
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java21
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java7
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java7
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java3
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java68
-rw-r--r--launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java101
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java36
-rw-r--r--launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java8
-rw-r--r--launcher/src/test/resources/spark-defaults.conf2
-rw-r--r--pom.xml171
-rw-r--r--project/SparkBuild.scala41
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala1
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala2
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala12
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala8
-rw-r--r--sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java4
-rw-r--r--sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala11
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java (renamed from external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java)3
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java2
-rw-r--r--sql/hive/pom.xml3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala10
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java (renamed from external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java)11
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java)10
101 files changed, 513 insertions, 1186 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 53f18796e6..9d8607d913 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -187,6 +187,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
<executions>
<execution>
<id>dist</id>
diff --git a/build/mvn b/build/mvn
index 866bad892c..1e393c331d 100755
--- a/build/mvn
+++ b/build/mvn
@@ -22,7 +22,7 @@ _DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# Preserve the calling directory
_CALLING_DIR="$(pwd)"
# Options used during compilation
-_COMPILE_JVM_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+_COMPILE_JVM_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
# Installs any application tarball given a URL, the expected tarball name,
# and, optionally, a checkable binary path to determine if the binary has
@@ -141,13 +141,9 @@ cd "${_CALLING_DIR}"
# Now that zinc is ensured to be installed, check its status and, if its
# not running or just installed, start it
if [ -n "${ZINC_INSTALL_FLAG}" -o -z "`"${ZINC_BIN}" -status -port ${ZINC_PORT}`" ]; then
- ZINC_JAVA_HOME=
- if [ -n "$JAVA_7_HOME" ]; then
- ZINC_JAVA_HOME="env JAVA_HOME=$JAVA_7_HOME"
- fi
export ZINC_OPTS=${ZINC_OPTS:-"$_COMPILE_JVM_OPTS"}
"${ZINC_BIN}" -shutdown -port ${ZINC_PORT}
- $ZINC_JAVA_HOME "${ZINC_BIN}" -start -port ${ZINC_PORT} \
+ "${ZINC_BIN}" -start -port ${ZINC_PORT} \
-scala-compiler "${SCALA_COMPILER}" \
-scala-library "${SCALA_LIBRARY}" &>/dev/null
fi
diff --git a/build/sbt-launch-lib.bash b/build/sbt-launch-lib.bash
index 615f848394..4732669ee6 100755
--- a/build/sbt-launch-lib.bash
+++ b/build/sbt-launch-lib.bash
@@ -117,7 +117,7 @@ get_mem_opts () {
(( $perm < 4096 )) || perm=4096
local codecache=$(( $perm / 2 ))
- echo "-Xms${mem}m -Xmx${mem}m -XX:MaxPermSize=${perm}m -XX:ReservedCodeCacheSize=${codecache}m"
+ echo "-Xms${mem}m -Xmx${mem}m -XX:ReservedCodeCacheSize=${codecache}m"
}
require_arg () {
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 7e7d78d42a..a6f527c118 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -32,8 +32,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.SettableFuture;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -133,40 +131,36 @@ public class TransportClient implements Closeable {
*/
public void fetchChunk(
long streamId,
- final int chunkIndex,
- final ChunkReceivedCallback callback) {
- final long startTime = System.currentTimeMillis();
+ int chunkIndex,
+ ChunkReceivedCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel));
}
- final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
+ StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
handler.addFetchRequest(streamChunkId, callback);
- channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request {} to {} took {} ms", streamChunkId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- handler.removeFetchRequest(streamChunkId);
- channel.close();
- try {
- callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request {} to {} took {} ms", streamChunkId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ handler.removeFetchRequest(streamChunkId);
+ channel.close();
+ try {
+ callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
/**
@@ -175,8 +169,8 @@ public class TransportClient implements Closeable {
* @param streamId The stream to fetch.
* @param callback Object to call with the stream data.
*/
- public void stream(final String streamId, final StreamCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public void stream(String streamId, StreamCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("Sending stream request for {} to {}", streamId, getRemoteAddress(channel));
}
@@ -186,29 +180,25 @@ public class TransportClient implements Closeable {
// when responses arrive.
synchronized (this) {
handler.addStreamCallback(callback);
- channel.writeAndFlush(new StreamRequest(streamId)).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- long timeTaken = System.currentTimeMillis() - startTime;
- if (logger.isTraceEnabled()) {
- logger.trace("Sending request for {} to {} took {} ms", streamId,
- getRemoteAddress(channel), timeTaken);
- }
- } else {
- String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
- getRemoteAddress(channel), future.cause());
- logger.error(errorMsg, future.cause());
- channel.close();
- try {
- callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
- } catch (Exception e) {
- logger.error("Uncaught exception in RPC response callback handler!", e);
- }
- }
+ channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
+ if (future.isSuccess()) {
+ long timeTaken = System.currentTimeMillis() - startTime;
+ if (logger.isTraceEnabled()) {
+ logger.trace("Sending request for {} to {} took {} ms", streamId,
+ getRemoteAddress(channel), timeTaken);
}
- });
+ } else {
+ String errorMsg = String.format("Failed to send request for %s to %s: %s", streamId,
+ getRemoteAddress(channel), future.cause());
+ logger.error(errorMsg, future.cause());
+ channel.close();
+ try {
+ callback.onFailure(streamId, new IOException(errorMsg, future.cause()));
+ } catch (Exception e) {
+ logger.error("Uncaught exception in RPC response callback handler!", e);
+ }
+ }
+ });
}
}
@@ -220,19 +210,17 @@ public class TransportClient implements Closeable {
* @param callback Callback to handle the RPC's reply.
* @return The RPC's id.
*/
- public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) {
- final long startTime = System.currentTimeMillis();
+ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
+ long startTime = System.currentTimeMillis();
if (logger.isTraceEnabled()) {
logger.trace("Sending RPC to {}", getRemoteAddress(channel));
}
- final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+ long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
handler.addRpcRequest(requestId, callback);
- channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
+ channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message)))
+ .addListener(future -> {
if (future.isSuccess()) {
long timeTaken = System.currentTimeMillis() - startTime;
if (logger.isTraceEnabled()) {
@@ -251,8 +239,7 @@ public class TransportClient implements Closeable {
logger.error("Uncaught exception in RPC response callback handler!", e);
}
}
- }
- });
+ });
return requestId;
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
index 980525dbf0..799f4540aa 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthClientBootstrap.java
@@ -20,12 +20,7 @@ package org.apache.spark.network.crypto;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
-import java.security.Key;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -37,7 +32,6 @@ import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.sasl.SaslClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
@@ -103,20 +97,18 @@ public class AuthClientBootstrap implements TransportClientBootstrap {
private void doSparkAuth(TransportClient client, Channel channel)
throws GeneralSecurityException, IOException {
- AuthEngine engine = new AuthEngine(authUser, secretKeyHolder.getSecretKey(authUser), conf);
- try {
+ String secretKey = secretKeyHolder.getSecretKey(authUser);
+ try (AuthEngine engine = new AuthEngine(authUser, secretKey, conf)) {
ClientChallenge challenge = engine.challenge();
ByteBuf challengeData = Unpooled.buffer(challenge.encodedLength());
challenge.encode(challengeData);
- ByteBuffer responseData = client.sendRpcSync(challengeData.nioBuffer(),
- conf.authRTTimeoutMs());
+ ByteBuffer responseData =
+ client.sendRpcSync(challengeData.nioBuffer(), conf.authRTTimeoutMs());
ServerResponse response = ServerResponse.decodeMessage(responseData);
engine.validate(response);
engine.sessionCipher().addToChannel(channel);
- } finally {
- engine.close();
}
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
index 991d8ba95f..0a5c029940 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/AuthRpcHandler.java
@@ -17,9 +17,7 @@
package org.apache.spark.network.crypto;
-import java.io.IOException;
import java.nio.ByteBuffer;
-import javax.security.sasl.Sasl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
@@ -35,7 +33,6 @@ import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.sasl.SaslRpcHandler;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
-import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.TransportConf;
/**
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index 900e8eb255..8193bc1376 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import com.google.common.base.Throwables;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -189,21 +187,16 @@ public class TransportRequestHandler extends MessageHandler<RequestMessage> {
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
- private void respond(final Encodable result) {
- final SocketAddress remoteAddress = channel.remoteAddress();
- channel.writeAndFlush(result).addListener(
- new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- logger.trace("Sent result {} to client {}", result, remoteAddress);
- } else {
- logger.error(String.format("Error sending result %s to %s; closing connection",
- result, remoteAddress), future.cause());
- channel.close();
- }
- }
+ private void respond(Encodable result) {
+ SocketAddress remoteAddress = channel.remoteAddress();
+ channel.writeAndFlush(result).addListener(future -> {
+ if (future.isSuccess()) {
+ logger.trace("Sent result {} to client {}", result, remoteAddress);
+ } else {
+ logger.error(String.format("Error sending result %s to %s; closing connection",
+ result, remoteAddress), future.cause());
+ channel.close();
}
- );
+ });
}
}
diff --git a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
index 9a186f2113..a3519fe4a4 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/crypto/AuthEngineSuite.java
@@ -18,10 +18,8 @@
package org.apache.spark.network.crypto;
import java.util.Arrays;
-import java.util.Map;
import static java.nio.charset.StandardCharsets.UTF_8;
-import com.google.common.collect.ImmutableMap;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index 6e02430a8e..6daf9609d7 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -190,12 +190,8 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis);
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
- allMetrics.put("registeredExecutorsSize", new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return blockManager.getRegisteredExecutorsSize();
- }
- });
+ allMetrics.put("registeredExecutorsSize",
+ (Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
}
@Override
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 25e9abde70..62d58aba4c 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -205,12 +205,7 @@ public class ExternalShuffleBlockResolver {
logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
// Execute the actual deletion in a different thread, as it may take some time.
- directoryCleaner.execute(new Runnable() {
- @Override
- public void run() {
- deleteExecutorDirs(executor.localDirs);
- }
- });
+ directoryCleaner.execute(() -> deleteExecutorDirs(executor.localDirs));
}
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 8c0c400966..2c5827bf7d 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -82,23 +82,19 @@ public class ExternalShuffleClient extends ShuffleClient {
@Override
public void fetchBlocks(
- final String host,
- final int port,
- final String execId,
+ String host,
+ int port,
+ String execId,
String[] blockIds,
BlockFetchingListener listener) {
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
- new RetryingBlockFetcher.BlockFetchStarter() {
- @Override
- public void createAndStart(String[] blockIds, BlockFetchingListener listener)
- throws IOException, InterruptedException {
+ (blockIds1, listener1) -> {
TransportClient client = clientFactory.createClient(host, port);
- new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
- }
- };
+ new OneForOneBlockFetcher(client, appId, execId, blockIds1, listener1).start();
+ };
int maxRetries = conf.maxIORetries();
if (maxRetries > 0) {
@@ -131,12 +127,9 @@ public class ExternalShuffleClient extends ShuffleClient {
String execId,
ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
checkInit();
- TransportClient client = clientFactory.createUnmanagedClient(host, port);
- try {
+ try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
- } finally {
- client.close();
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 5be855048e..f309dda8af 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -164,12 +164,9 @@ public class RetryingBlockFetcher {
logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
- fetchAllOutstanding();
- }
+ executorService.submit(() -> {
+ Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS);
+ fetchAllOutstanding();
});
}
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index bcd26d4352..1356c4723b 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -61,6 +61,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -71,6 +72,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index dc19f4ad5f..f03a4da5e7 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -98,6 +98,7 @@
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
+ <version>3.2.2</version>
<configuration>
<javacArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
@@ -108,6 +109,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
<configuration>
<compilerArgs combine.children="append">
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 671b8c7475..f13c24ae5e 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -162,14 +162,9 @@ public final class Platform {
constructor.setAccessible(true);
Field cleanerField = cls.getDeclaredField("cleaner");
cleanerField.setAccessible(true);
- final long memory = allocateMemory(size);
+ long memory = allocateMemory(size);
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
- Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
- @Override
- public void run() {
- freeMemory(memory);
- }
- });
+ Cleaner cleaner = Cleaner.create(buffer, () -> freeMemory(memory));
cleanerField.set(buffer, cleaner);
return buffer;
} catch (Exception e) {
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
index fd6e95c3e0..621f2c6bf3 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java
@@ -178,48 +178,52 @@ public final class CalendarInterval implements Serializable {
"Interval string does not match day-time format of 'd h:m:s.n': " + s);
} else {
try {
- if (unit.equals("year")) {
- int year = (int) toLongWithRange("year", m.group(1),
- Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
- result = new CalendarInterval(year * 12, 0L);
-
- } else if (unit.equals("month")) {
- int month = (int) toLongWithRange("month", m.group(1),
- Integer.MIN_VALUE, Integer.MAX_VALUE);
- result = new CalendarInterval(month, 0L);
-
- } else if (unit.equals("week")) {
- long week = toLongWithRange("week", m.group(1),
- Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
- result = new CalendarInterval(0, week * MICROS_PER_WEEK);
-
- } else if (unit.equals("day")) {
- long day = toLongWithRange("day", m.group(1),
- Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
- result = new CalendarInterval(0, day * MICROS_PER_DAY);
-
- } else if (unit.equals("hour")) {
- long hour = toLongWithRange("hour", m.group(1),
- Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
- result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
-
- } else if (unit.equals("minute")) {
- long minute = toLongWithRange("minute", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
- result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
-
- } else if (unit.equals("second")) {
- long micros = parseSecondNano(m.group(1));
- result = new CalendarInterval(0, micros);
-
- } else if (unit.equals("millisecond")) {
- long millisecond = toLongWithRange("millisecond", m.group(1),
- Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
- result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
-
- } else if (unit.equals("microsecond")) {
- long micros = Long.parseLong(m.group(1));
- result = new CalendarInterval(0, micros);
+ switch (unit) {
+ case "year":
+ int year = (int) toLongWithRange("year", m.group(1),
+ Integer.MIN_VALUE / 12, Integer.MAX_VALUE / 12);
+ result = new CalendarInterval(year * 12, 0L);
+ break;
+ case "month":
+ int month = (int) toLongWithRange("month", m.group(1),
+ Integer.MIN_VALUE, Integer.MAX_VALUE);
+ result = new CalendarInterval(month, 0L);
+ break;
+ case "week":
+ long week = toLongWithRange("week", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_WEEK, Long.MAX_VALUE / MICROS_PER_WEEK);
+ result = new CalendarInterval(0, week * MICROS_PER_WEEK);
+ break;
+ case "day":
+ long day = toLongWithRange("day", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_DAY, Long.MAX_VALUE / MICROS_PER_DAY);
+ result = new CalendarInterval(0, day * MICROS_PER_DAY);
+ break;
+ case "hour":
+ long hour = toLongWithRange("hour", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_HOUR, Long.MAX_VALUE / MICROS_PER_HOUR);
+ result = new CalendarInterval(0, hour * MICROS_PER_HOUR);
+ break;
+ case "minute":
+ long minute = toLongWithRange("minute", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MINUTE, Long.MAX_VALUE / MICROS_PER_MINUTE);
+ result = new CalendarInterval(0, minute * MICROS_PER_MINUTE);
+ break;
+ case "second": {
+ long micros = parseSecondNano(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
+ case "millisecond":
+ long millisecond = toLongWithRange("millisecond", m.group(1),
+ Long.MIN_VALUE / MICROS_PER_MILLI, Long.MAX_VALUE / MICROS_PER_MILLI);
+ result = new CalendarInterval(0, millisecond * MICROS_PER_MILLI);
+ break;
+ case "microsecond": {
+ long micros = Long.parseLong(m.group(1));
+ result = new CalendarInterval(0, micros);
+ break;
+ }
}
} catch (Exception e) {
throw new IllegalArgumentException("Error parsing interval string: " + e.getMessage(), e);
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
index ca7babc3f0..fd0f495ca2 100644
--- a/core/src/main/java/org/apache/spark/api/java/Optional.java
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -18,6 +18,7 @@
package org.apache.spark.api.java;
import java.io.Serializable;
+import java.util.Objects;
import com.google.common.base.Preconditions;
@@ -52,8 +53,8 @@ import com.google.common.base.Preconditions;
* <li>{@link #isPresent()}</li>
* </ul>
*
- * <p>{@code java.util.Optional} itself is not used at this time because the
- * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * <p>{@code java.util.Optional} itself was not used because at the time, the
+ * project did not require Java 8. Using {@code com.google.common.base.Optional}
* has in the past caused serious library version conflicts with Guava that can't
* be resolved by shading. Hence this work-alike clone.</p>
*
@@ -171,7 +172,7 @@ public final class Optional<T> implements Serializable {
return false;
}
Optional<?> other = (Optional<?>) obj;
- return value == null ? other.value == null : value.equals(other.value);
+ return Objects.equals(value, other.value);
}
@Override
diff --git a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
index 07aebb75e8..33bedf7ebc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
* A function that returns zero or more output records from each grouping key and its values from 2
* Datasets.
*/
+@FunctionalInterface
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
index 576087b6f4..2f23da5bfe 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more records of type Double from each input record.
*/
+@FunctionalInterface
public interface DoubleFlatMapFunction<T> extends Serializable {
Iterator<Double> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
index bf16f791f9..3c0291cf46 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function that returns Doubles, and can be used to construct DoubleRDDs.
*/
+@FunctionalInterface
public interface DoubleFunction<T> extends Serializable {
double call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
index 462ca3f6f6..a6f69f7cdc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FilterFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* If the function returns true, the element is included in the returned Dataset.
*/
+@FunctionalInterface
public interface FilterFunction<T> extends Serializable {
boolean call(T value) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
index 2d8ea6d1a5..91d61292f1 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each input record.
*/
+@FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
index fc97b63f82..f9f2580b01 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that takes two inputs and returns zero or more output records.
*/
+@FunctionalInterface
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
index bae574ab57..6423c5d0fc 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* A function that returns zero or more output records from each grouping key and its values.
*/
+@FunctionalInterface
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
index 07e54b28fa..2e6e90818d 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachFunction.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
*
* Spark will invoke the call function on each element in the input Dataset.
*/
+@FunctionalInterface
public interface ForeachFunction<T> extends Serializable {
void call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
index 4938a51bcd..d8f55d0ae1 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ForeachPartitionFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a function used in Dataset's foreachPartition function.
*/
+@FunctionalInterface
public interface ForeachPartitionFunction<T> extends Serializable {
void call(Iterator<T> t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index b9d9777a75..8b2bbd501c 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
* DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
* when mapping RDDs of other types.
*/
+@FunctionalInterface
public interface Function<T1, R> extends Serializable {
R call(T1 v1) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function0.java b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
index c86928dd05..5c649d9de4 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function0.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function0.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A zero-argument function that returns an R.
*/
+@FunctionalInterface
public interface Function0<R> extends Serializable {
R call() throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function2.java b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
index a975ce3c68..a7d9647095 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 and returns an R.
*/
+@FunctionalInterface
public interface Function2<T1, T2, R> extends Serializable {
R call(T1 v1, T2 v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function3.java b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
index 6eecfb645a..77acd21d4e 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function3.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
*/
+@FunctionalInterface
public interface Function3<T1, T2, T3, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function4.java b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
index 9c35a22ca9..d530ba446b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function4.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function4.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A four-argument function that takes arguments of type T1, T2, T3 and T4 and returns an R.
*/
+@FunctionalInterface
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
index 3ae6ef4489..5efff943c8 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for a map function used in Dataset's map function.
*/
+@FunctionalInterface
public interface MapFunction<T, U> extends Serializable {
U call(T value) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
index faa59eabc8..2c3d43afc0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapGroupsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for a map function used in GroupedDataset's mapGroup function.
*/
+@FunctionalInterface
public interface MapGroupsFunction<K, V, R> extends Serializable {
R call(K key, Iterator<V> values) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
index cf9945a215..68e8557c88 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
/**
* Base interface for function used in Dataset's mapPartitions.
*/
+@FunctionalInterface
public interface MapPartitionsFunction<T, U> extends Serializable {
Iterator<U> call(Iterator<T> input) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
index 51eed2e67b..97bd2b37a0 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java
@@ -26,6 +26,7 @@ import scala.Tuple2;
* A function that returns zero or more key-value pair records from each input record. The
* key-value pairs are represented as scala.Tuple2 objects.
*/
+@FunctionalInterface
public interface PairFlatMapFunction<T, K, V> extends Serializable {
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
index 2fdfa7184a..34a7e4489a 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java
@@ -25,6 +25,7 @@ import scala.Tuple2;
* A function that returns key-value pairs (Tuple2&lt;K, V&gt;), and can be used to
* construct PairRDDs.
*/
+@FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
index ee092d0058..d9029d8538 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/ReduceFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* Base interface for function used in Dataset's reduce.
*/
+@FunctionalInterface
public interface ReduceFunction<T> extends Serializable {
T call(T v1, T v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
index f30d42ee57..aff2bc6e94 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A function with no return value.
*/
+@FunctionalInterface
public interface VoidFunction<T> extends Serializable {
void call(T t) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
index da9ae1c9c5..ddb616241b 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction2.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
/**
* A two-argument function that takes arguments of type T1 and T2 with no return value.
*/
+@FunctionalInterface
public interface VoidFunction2<T1, T2> extends Serializable {
void call(T1 v1, T2 v2) throws Exception;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dcae4a34c4..189d607fa6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -162,14 +162,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(
- new TaskCompletionListener() {
- @Override
- public void onTaskCompletion(TaskContext context) {
- cleanupResources();
- }
- }
- );
+ taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
}
/**
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 01aed95878..cf4dfde86c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -27,22 +27,18 @@ final class UnsafeSorterSpillMerger {
private final PriorityQueue<UnsafeSorterIterator> priorityQueue;
UnsafeSorterSpillMerger(
- final RecordComparator recordComparator,
- final PrefixComparator prefixComparator,
- final int numSpills) {
- final Comparator<UnsafeSorterIterator> comparator = new Comparator<UnsafeSorterIterator>() {
-
- @Override
- public int compare(UnsafeSorterIterator left, UnsafeSorterIterator right) {
- final int prefixComparisonResult =
- prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
- if (prefixComparisonResult == 0) {
- return recordComparator.compare(
- left.getBaseObject(), left.getBaseOffset(),
- right.getBaseObject(), right.getBaseOffset());
- } else {
- return prefixComparisonResult;
- }
+ RecordComparator recordComparator,
+ PrefixComparator prefixComparator,
+ int numSpills) {
+ Comparator<UnsafeSorterIterator> comparator = (left, right) -> {
+ int prefixComparisonResult =
+ prefixComparator.compare(left.getKeyPrefix(), right.getKeyPrefix());
+ if (prefixComparisonResult == 0) {
+ return recordComparator.compare(
+ left.getBaseObject(), left.getBaseOffset(),
+ right.getBaseObject(), right.getBaseOffset());
+ } else {
+ return prefixComparisonResult;
}
};
priorityQueue = new PriorityQueue<>(numSpills, comparator);
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cbab7b8844..7e564061e6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -350,9 +350,6 @@ class SparkContext(config: SparkConf) extends Logging {
private def warnDeprecatedVersions(): Unit = {
val javaVersion = System.getProperty("java.version").split("[+.\\-]+", 3)
- if (javaVersion.length >= 2 && javaVersion(1).toInt == 7) {
- logWarning("Support for Java 7 is deprecated as of Spark 2.0.0")
- }
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.10"))) {
logWarning("Support for Scala 2.10 is deprecated as of Spark 2.1.0")
}
diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
index 31b9c5edf0..3fd812e9fc 100644
--- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala
@@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
cmd.add(s"-Xmx${memoryMb}M")
command.javaOpts.foreach(cmd.add)
- CommandBuilderUtils.addPermGenSizeOpt(cmd)
addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
cmd
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe6fe6aa4f..1e6e9a223e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1882,20 +1882,17 @@ private[spark] object Utils extends Logging {
def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
// Politely destroy first
process.destroy()
-
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
// Successful exit
Option(process.exitValue())
} else {
- // Java 8 added a new API which will more forcibly kill the process. Use that if available.
try {
- classOf[Process].getMethod("destroyForcibly").invoke(process)
+ process.destroyForcibly()
} catch {
- case _: NoSuchMethodException => return None // Not available; give up
case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
}
// Wait, again, although this really should return almost immediately
- if (waitForProcess(process, timeoutMs)) {
+ if (process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)) {
Option(process.exitValue())
} else {
logWarning("Timed out waiting to forcibly kill process")
@@ -1905,44 +1902,11 @@ private[spark] object Utils extends Logging {
}
/**
- * Wait for a process to terminate for at most the specified duration.
- *
- * @return whether the process actually terminated before the given timeout.
- */
- def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
- try {
- // Use Java 8 method if available
- classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
- .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
- .asInstanceOf[Boolean]
- } catch {
- case _: NoSuchMethodException =>
- // Otherwise implement it manually
- var terminated = false
- val startTime = System.currentTimeMillis
- while (!terminated) {
- try {
- process.exitValue()
- terminated = true
- } catch {
- case e: IllegalThreadStateException =>
- // Process not terminated yet
- if (System.currentTimeMillis - startTime > timeoutMs) {
- return false
- }
- Thread.sleep(100)
- }
- }
- true
- }
- }
-
- /**
* Return the stderr of a process after waiting for the process to terminate.
* If the process does not terminate within the specified timeout, return None.
*/
def getStderr(process: Process, timeoutMs: Long): Option[String] = {
- val terminated = Utils.waitForProcess(process, timeoutMs)
+ val terminated = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS)
if (terminated) {
Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
} else {
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
index fa3a66e73c..e22ad89c1d 100644
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/Java8RDDAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/Java8RDDAPISuite.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package test.org.apache.spark.java8;
+package test.org.apache.spark;
import java.io.File;
import java.io.Serializable;
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 7bebe0612f..80aab100ac 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark;
+package test.org.apache.spark;
import java.io.*;
import java.nio.channels.FileChannel;
@@ -34,6 +34,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
+import org.apache.spark.Accumulator;
+import org.apache.spark.AccumulatorParam;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6027310a96..43f77e68c1 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -919,7 +919,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(pidExists(pid))
val terminated = Utils.terminateProcess(process, 5000)
assert(terminated.isDefined)
- Utils.waitForProcess(process, 5000)
+ process.waitFor(5, TimeUnit.SECONDS)
val durationMs = System.currentTimeMillis() - startTimeMs
assert(durationMs < 5000)
assert(!pidExists(pid))
@@ -932,7 +932,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
var majorVersion = versionParts(0).toInt
if (majorVersion == 1) majorVersion = versionParts(1).toInt
if (majorVersion >= 8) {
- // Java8 added a way to forcibly terminate a process. We'll make sure that works by
+ // We'll make sure that forcibly terminating a process works by
// creating a very misbehaving process. It ignores SIGTERM and has been SIGSTOPed. On
// older versions of java, this will *not* terminate.
val file = File.createTempFile("temp-file-name", ".tmp")
@@ -953,7 +953,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
val start = System.currentTimeMillis()
val terminated = Utils.terminateProcess(process, 5000)
assert(terminated.isDefined)
- Utils.waitForProcess(process, 5000)
+ process.waitFor(5, TimeUnit.SECONDS)
val duration = System.currentTimeMillis() - start
assert(duration < 6000) // add a little extra time to allow a force kill to finish
assert(!pidExists(pid))
diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1
index 1350095a94..1c34f1bbc1 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -90,7 +90,7 @@ Invoke-Expression "7z.exe x maven.zip"
# add maven to environment variables
$env:Path += ";$tools\apache-maven-$mavenVer\bin"
$env:M2_HOME = "$tools\apache-maven-$mavenVer"
-$env:MAVEN_OPTS = "-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
+$env:MAVEN_OPTS = "-Xmx2g -XX:ReservedCodeCacheSize=512m"
Pop-Location
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index d616f80c54..e1db997a7d 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -267,7 +267,6 @@ if [[ "$1" == "docs" ]]; then
echo "Building Spark docs"
dest_dir="$REMOTE_PARENT_DIR/${DEST_DIR_NAME}-docs"
cd docs
- # Compile docs with Java 7 to use nicer format
# TODO: Make configurable to add this: PRODUCTION=1
PRODUCTION=1 RELEASE_VERSION="$SPARK_VERSION" jekyll build
echo "Copying release documentation to $dest_dir"
diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index dc8dfb9344..22cdfd467c 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -146,7 +146,7 @@ fi
# Build uber fat JAR
cd "$SPARK_HOME"
-export MAVEN_OPTS="${MAVEN_OPTS:--Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m}"
+export MAVEN_OPTS="${MAVEN_OPTS:-Xmx2g -XX:ReservedCodeCacheSize=512m}"
# Store the command as an array because $MVN variable might have spaces in it.
# Normal quoting tricks don't work.
diff --git a/dev/mima b/dev/mima
index 11c4af2980..eca78ad109 100755
--- a/dev/mima
+++ b/dev/mima
@@ -31,7 +31,6 @@ OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export
rm -f .generated-mima*
java \
- -XX:MaxPermSize=1g \
-Xmx2g \
-cp "$TOOLS_CLASSPATH:$OLD_DEPS_CLASSPATH" \
org.apache.spark.tools.GenerateMIMAIgnore
diff --git a/dev/run-tests.py b/dev/run-tests.py
index 0e7f5ffd8d..04035b33e6 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -492,9 +492,6 @@ def main():
java_version = determine_java_version(java_exe)
- if java_version.minor < 8:
- print("[warn] Java 8 tests will not run because JDK version is < 1.8.")
-
# install SparkR
if which("R"):
run_cmd([os.path.join(SPARK_HOME, "R", "install-dev.sh")])
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index eb43f229c2..2906a81f61 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -46,7 +46,7 @@ OLD_VERSION=$($MVN -q \
-Dexec.executable="echo" \
-Dexec.args='${project.version}' \
--non-recursive \
- org.codehaus.mojo:exec-maven-plugin:1.3.1:exec)
+ org.codehaus.mojo:exec-maven-plugin:1.5.0:exec)
if [ $? != 0 ]; then
echo -e "Error while getting version string from Maven:\n$OLD_VERSION"
exit 1
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 690c656bad..56b892696e 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -12,8 +12,8 @@ redirect_from: "building-with-maven.html"
## Apache Maven
The Maven-based build is the build of reference for Apache Spark.
-Building Spark using Maven requires Maven 3.3.9 or newer and Java 7+.
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Building Spark using Maven requires Maven 3.3.9 or newer and Java 8+.
+Note that support for Java 7 was removed as of Spark 2.2.0.
### Setting up Maven's Memory Usage
@@ -21,28 +21,18 @@ You'll need to configure Maven to use more memory than usual by setting `MAVEN_O
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
-When compiling with Java 7, you will need to add the additional option "-XX:MaxPermSize=512M" to MAVEN_OPTS.
-
+(The `ReservedCodeCacheSize` setting is optional but recommended.)
If you don't add these parameters to `MAVEN_OPTS`, you may see errors and warnings like the following:
[INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
- [ERROR] PermGen space -> [Help 1]
-
- [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/me/Development/spark/core/target/scala-{{site.SCALA_BINARY_VERSION}}/classes...
[ERROR] Java heap space -> [Help 1]
- [INFO] Compiling 233 Scala sources and 41 Java sources to /Users/me/Development/spark/sql/core/target/scala-{site.SCALA_BINARY_VERSION}/classes...
- OpenJDK 64-Bit Server VM warning: CodeCache is full. Compiler has been disabled.
- OpenJDK 64-Bit Server VM warning: Try increasing the code cache size using -XX:ReservedCodeCacheSize=
-
You can fix these problems by setting the `MAVEN_OPTS` variable as discussed before.
**Note:**
* If using `build/mvn` with no `MAVEN_OPTS` set, the script will automatically add the above options to the `MAVEN_OPTS` environment variable.
-* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.
-* You may see warnings like "ignoring option MaxPermSize=1g; support was removed in 8.0" when building or running tests with Java 8 and `build/mvn`. These warnings are harmless.
-
+* The `test` phase of the Spark build will automatically add these options to `MAVEN_OPTS`, even when not using `build/mvn`.
### build/mvn
@@ -224,20 +214,6 @@ To run test suites of a specific sub project as follows:
./build/sbt core/test
-## Running Java 8 Test Suites
-
-Running only Java 8 tests and nothing else.
-
- ./build/mvn install -DskipTests
- ./build/mvn -pl :java8-tests_2.11 test
-
-or
-
- ./build/sbt java8-tests/test
-
-Java 8 tests are automatically enabled when a Java 8 JDK is detected.
-If you have JDK 8 installed but it is not the system default, you can set JAVA_HOME to point to JDK 8 before running the tests.
-
## PySpark pip installable
If you are building Spark for use in a Python environment and you wish to pip install it, you will first need to build the Spark JARs as described above. Then you can construct an sdist package suitable for setup.py and pip installable package.
diff --git a/docs/index.md b/docs/index.md
index 023e06ada3..19a9d3bfc6 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -26,11 +26,13 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy
locally on one machine --- all you need is to have `java` installed on your system `PATH`,
or the `JAVA_HOME` environment variable pointing to a Java installation.
-Spark runs on Java 7+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
+Spark runs on Java 8+, Python 2.6+/3.4+ and R 3.1+. For the Scala API, Spark {{site.SPARK_VERSION}}
uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version
({{site.SCALA_BINARY_VERSION}}.x).
-Note that support for Java 7 and Python 2.6 are deprecated as of Spark 2.0.0, and support for
+Note that support for Java 7 was removed as of Spark 2.2.0.
+
+Note that support for Python 2.6 is deprecated as of Spark 2.0.0, and support for
Scala 2.10 and versions of Hadoop before 2.6 are deprecated as of Spark 2.1.0, and may be
removed in Spark 2.2.0.
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 3085539b40..034e89e250 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -222,7 +222,7 @@ svmAlg.optimizer()
.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater());
-final SVMModel modelL1 = svmAlg.run(training.rdd());
+SVMModel modelL1 = svmAlg.run(training.rdd());
{% endhighlight %}
In order to run the above application, follow the instructions
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 430c069045..c29400af85 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -317,12 +317,7 @@ JavaSparkContext jsc = ...
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
JavaDoubleRDD u = normalJavaRDD(jsc, 1000000L, 10);
// Apply a transform to get a random double RDD following `N(1, 4)`.
-JavaDoubleRDD v = u.map(
- new Function<Double, Double>() {
- public Double call(Double x) {
- return 1.0 + 2.0 * x;
- }
- });
+JavaDoubleRDD v = u.mapToDouble(x -> 1.0 + 2.0 * x);
{% endhighlight %}
</div>
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index db8b048fce..6740dbe001 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -54,12 +54,12 @@ import org.apache.spark.SparkConf
<div data-lang="java" markdown="1">
-Spark {{site.SPARK_VERSION}} works with Java 7 and higher. If you are using Java 8, Spark supports
+Spark {{site.SPARK_VERSION}} supports
[lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
for concisely writing functions, otherwise you can use the classes in the
[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
-Note that support for Java 7 is deprecated as of Spark 2.0.0 and may be removed in Spark 2.2.0.
+Note that support for Java 7 was removed in Spark 2.2.0.
To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
@@ -295,11 +295,6 @@ JavaRDD<Integer> distData = sc.parallelize(data);
Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list.
We describe operations on distributed datasets later on.
-**Note:** *In this guide, we'll often use the concise Java 8 lambda syntax to specify Java functions, but
-in older versions of Java you can implement the interfaces in the
-[org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
-We describe [passing functions to Spark](#passing-functions-to-spark) in more detail below.*
-
</div>
<div data-lang="python" markdown="1">
@@ -658,7 +653,7 @@ There are two ways to create such functions:
* Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
and pass an instance of it to Spark.
-* In Java 8, use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
+* Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
to concisely define an implementation.
While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs
diff --git a/docs/quick-start.md b/docs/quick-start.md
index 0836c602fe..04ac278762 100644
--- a/docs/quick-start.md
+++ b/docs/quick-start.md
@@ -320,13 +320,8 @@ public class SimpleApp {
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
- long numAs = logData.filter(new Function<String, Boolean>() {
- public Boolean call(String s) { return s.contains("a"); }
- }).count();
-
- long numBs = logData.filter(new Function<String, Boolean>() {
- public Boolean call(String s) { return s.contains("b"); }
- }).count();
+ long numAs = logData.filter(s -> s.contains("a")).count();
+ long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
diff --git a/docs/streaming-custom-receivers.md b/docs/streaming-custom-receivers.md
index 117996db9d..d4ddcb16bd 100644
--- a/docs/streaming-custom-receivers.md
+++ b/docs/streaming-custom-receivers.md
@@ -113,15 +113,13 @@ public class JavaCustomReceiver extends Receiver<String> {
port = port_;
}
+ @Override
public void onStart() {
// Start the thread that receives data over a connection
- new Thread() {
- @Override public void run() {
- receive();
- }
- }.start();
+ new Thread(this::receive).start();
}
+ @Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
@@ -189,7 +187,7 @@ The full source code is in the example [CustomReceiver.scala]({{site.SPARK_GITHU
{% highlight java %}
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
-JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { ... });
+JavaDStream<String> words = lines.flatMap(s -> ...);
...
{% endhighlight %}
diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 6ef54ac210..e383701316 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -68,20 +68,14 @@ kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
-final JavaInputDStream<ConsumerRecord<String, String>> stream =
+JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
-stream.mapToPair(
- new PairFunction<ConsumerRecord<String, String>, String, String>() {
- @Override
- public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
- return new Tuple2<>(record.key(), record.value());
- }
- })
+stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
{% endhighlight %}
</div>
</div>
@@ -162,19 +156,13 @@ stream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
- @Override
- public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
- OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
- }
- });
- }
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ rdd.foreachPartition(consumerRecords -> {
+ OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
+ });
});
{% endhighlight %}
</div>
@@ -205,14 +193,11 @@ As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if calle
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- // some time later, after outputs have completed
- ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
- }
+ // some time later, after outputs have completed
+ ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
{% endhighlight %}
</div>
@@ -268,21 +253,18 @@ JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirec
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
-stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
- @Override
- public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
- OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
-
- Object results = yourCalculation(rdd);
+stream.foreachRDD(rdd -> {
+ OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+
+ Object results = yourCalculation(rdd);
- // begin your transaction
+ // begin your transaction
- // update results
- // update offsets where the end of existing offsets matches the beginning of this batch of offsets
- // assert that offsets were updated correctly
+ // update results
+ // update offsets where the end of existing offsets matches the beginning of this batch of offsets
+ // assert that offsets were updated correctly
- // end your transaction
- }
+ // end your transaction
});
{% endhighlight %}
</div>
diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md
index 58b17aa4ce..24a3e4cdbb 100644
--- a/docs/streaming-kafka-0-8-integration.md
+++ b/docs/streaming-kafka-0-8-integration.md
@@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application.
</div>
<div data-lang="java" markdown="1">
// Hold a reference to the current offset ranges, so it can be used downstream
- final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
-
- directKafkaStream.transformToPair(
- new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
- @Override
- public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
- OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
- offsetRanges.set(offsets);
- return rdd;
- }
- }
- ).map(
+ AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
+
+ directKafkaStream.transformToPair(rdd -> {
+ OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
+ offsetRanges.set(offsets);
+ return rdd;
+ }).map(
...
- ).foreachRDD(
- new Function<JavaPairRDD<String, String>, Void>() {
- @Override
- public Void call(JavaPairRDD<String, String> rdd) throws IOException {
- for (OffsetRange o : offsetRanges.get()) {
- System.out.println(
- o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
- );
- }
- ...
- return null;
- }
- }
- );
+ ).foreachRDD(rdd -> {
+ for (OffsetRange o : offsetRanges.get()) {
+ System.out.println(
+ o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
+ );
+ }
+ ...
+ });
</div>
<div data-lang="python" markdown="1">
offsetRanges = []
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index a878971608..abd4ac9653 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -163,12 +163,7 @@ space into words.
{% highlight java %}
// Split each line into words
-JavaDStream<String> words = lines.flatMap(
- new FlatMapFunction<String, String>() {
- @Override public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- });
+JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
{% endhighlight %}
`flatMap` is a DStream operation that creates a new DStream by
@@ -183,18 +178,8 @@ Next, we want to count these words.
{% highlight java %}
// Count each word in each batch
-JavaPairDStream<String, Integer> pairs = words.mapToPair(
- new PairFunction<String, String, Integer>() {
- @Override public Tuple2<String, Integer> call(String s) {
- return new Tuple2<>(s, 1);
- }
- });
-JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
- new Function2<Integer, Integer, Integer>() {
- @Override public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
- });
+JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
+JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
@@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count.
{% highlight java %}
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
- new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
- @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
- Integer newSum = ... // add the new values with the previous running count to get the new count
- return Optional.of(newSum);
- }
+ (values, state) -> {
+ Integer newSum = ... // add the new values with the previous running count to get the new count
+ return Optional.of(newSum);
};
{% endhighlight %}
@@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd =>
{% highlight java %}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
-final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
+JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
-JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
- new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
- @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
- rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
- ...
- }
- });
+JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
+ rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
+ ...
+});
{% endhighlight %}
</div>
@@ -986,15 +966,8 @@ val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Se
<div data-lang="java" markdown="1">
{% highlight java %}
-// Reduce function adding two integers, defined separately for clarity
-Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() {
- @Override public Integer call(Integer i1, Integer i2) {
- return i1 + i2;
- }
-};
-
// Reduce last 30 seconds of data, every 10 seconds
-JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
+JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
{% endhighlight %}
</div>
@@ -1141,14 +1114,7 @@ val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
{% highlight java %}
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
-JavaPairDStream<String, String> joinedStream = windowedStream.transform(
- new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() {
- @Override
- public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) {
- return rdd.join(dataset);
- }
- }
-);
+JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
@@ -1248,17 +1214,11 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- final Connection connection = createNewConnection(); // executed at the driver
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String record) {
- connection.send(record); // executed at the worker
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ Connection connection = createNewConnection(); // executed at the driver
+ rdd.foreach(record -> {
+ connection.send(record); // executed at the worker
+ });
});
{% endhighlight %}
</div>
@@ -1297,18 +1257,12 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreach(new VoidFunction<String>() {
- @Override
- public void call(String record) {
- Connection connection = createNewConnection();
- connection.send(record);
- connection.close();
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreach(record -> {
+ Connection connection = createNewConnection();
+ connection.send(record);
+ connection.close();
+ });
});
{% endhighlight %}
</div>
@@ -1344,20 +1298,14 @@ dstream.foreachRDD { rdd =>
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> partitionOfRecords) {
- Connection connection = createNewConnection();
- while (partitionOfRecords.hasNext()) {
- connection.send(partitionOfRecords.next());
- }
- connection.close();
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreachPartition(partitionOfRecords -> {
+ Connection connection = createNewConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ connection.close();
+ });
});
{% endhighlight %}
</div>
@@ -1396,21 +1344,15 @@ dstream.foreachRDD { rdd =>
<div data-lang="java" markdown="1">
{% highlight java %}
-dstream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
- @Override
- public void call(JavaRDD<String> rdd) {
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> partitionOfRecords) {
- // ConnectionPool is a static, lazily initialized pool of connections
- Connection connection = ConnectionPool.getConnection();
- while (partitionOfRecords.hasNext()) {
- connection.send(partitionOfRecords.next());
- }
- ConnectionPool.returnConnection(connection); // return to the pool for future reuse
- }
- });
- }
+dstream.foreachRDD(rdd -> {
+ rdd.foreachPartition(partitionOfRecords -> {
+ // ConnectionPool is a static, lazily initialized pool of connections
+ Connection connection = ConnectionPool.getConnection();
+ while (partitionOfRecords.hasNext()) {
+ connection.send(partitionOfRecords.next());
+ }
+ ConnectionPool.returnConnection(connection); // return to the pool for future reuse
+ });
});
{% endhighlight %}
</div>
@@ -1495,35 +1437,26 @@ public class JavaRow implements java.io.Serializable {
JavaDStream<String> words = ...
-words.foreachRDD(
- new Function2<JavaRDD<String>, Time, Void>() {
- @Override
- public Void call(JavaRDD<String> rdd, Time time) {
-
- // Get the singleton instance of SparkSession
- SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
+words.foreachRDD((rdd, time) -> {
+ // Get the singleton instance of SparkSession
+ SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
- // Convert RDD[String] to RDD[case class] to DataFrame
- JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
- public JavaRow call(String word) {
- JavaRow record = new JavaRow();
- record.setWord(word);
- return record;
- }
- });
- DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
+ // Convert RDD[String] to RDD[case class] to DataFrame
+ JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
+ JavaRow record = new JavaRow();
+ record.setWord(word);
+ return record;
+ });
+ DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
- // Creates a temporary view using the DataFrame
- wordsDataFrame.createOrReplaceTempView("words");
+ // Creates a temporary view using the DataFrame
+ wordsDataFrame.createOrReplaceTempView("words");
- // Do word count on table using SQL and print it
- DataFrame wordCountsDataFrame =
- spark.sql("select word, count(*) as total from words group by word");
- wordCountsDataFrame.show();
- return null;
- }
- }
-);
+ // Do word count on table using SQL and print it
+ DataFrame wordCountsDataFrame =
+ spark.sql("select word, count(*) as total from words group by word");
+ wordCountsDataFrame.show();
+});
{% endhighlight %}
See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
@@ -1883,27 +1816,21 @@ class JavaDroppedWordsCounter {
}
}
-wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
- @Override
- public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
- // Get or register the blacklist Broadcast
- final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
- // Get or register the droppedWordsCounter Accumulator
- final LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
- // Use blacklist to drop words and use droppedWordsCounter to count them
- String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
- @Override
- public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
- if (blacklist.value().contains(wordCount._1())) {
- droppedWordsCounter.add(wordCount._2());
- return false;
- } else {
- return true;
- }
- }
- }).collect().toString();
- String output = "Counts at time " + time + " " + counts;
- }
+wordCounts.foreachRDD((rdd, time) -> {
+ // Get or register the blacklist Broadcast
+ Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
+ // Get or register the droppedWordsCounter Accumulator
+ LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
+ // Use blacklist to drop words and use droppedWordsCounter to count them
+ String counts = rdd.filter(wordCount -> {
+ if (blacklist.value().contains(wordCount._1())) {
+ droppedWordsCounter.add(wordCount._2());
+ return false;
+ } else {
+ return true;
+ }
+ }).collect().toString();
+ String output = "Counts at time " + time + " " + counts;
}
{% endhighlight %}
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index b816072cb8..ad3b2fb26d 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -103,13 +103,7 @@ Dataset<Row> lines = spark
// Split the lines into words
Dataset<String> words = lines
.as(Encoders.STRING())
- .flatMap(
- new FlatMapFunction<String, String>() {
- @Override
- public Iterator<String> call(String x) {
- return Arrays.asList(x.split(" ")).iterator();
- }
- }, Encoders.STRING());
+ .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();
@@ -517,7 +511,7 @@ val csvDF = spark
SparkSession spark = ...
// Read text from socket
-Dataset[Row] socketDF = spark
+Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
@@ -530,7 +524,7 @@ socketDF.printSchema();
// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
-Dataset[Row] csvDF = spark
+Dataset<Row> csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
@@ -625,33 +619,15 @@ Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); //
// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
-ds.filter(new FilterFunction<DeviceData>() { // using typed APIs
- @Override
- public boolean call(DeviceData value) throws Exception {
- return value.getSignal() > 10;
- }
-}).map(new MapFunction<DeviceData, String>() {
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDevice();
- }
-}, Encoders.STRING());
+ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
+ .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API
// Running average signal for each device type
-ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
- @Override
- public String call(DeviceData value) throws Exception {
- return value.getDeviceType();
- }
-}, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() {
- @Override
- public Double call(DeviceData value) throws Exception {
- return value.getSignal();
- }
-}));
+ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
+ .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
{% endhighlight %}
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index f42fd3317b..004e9b12f6 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -69,9 +69,9 @@ public class JavaTokenizerExample {
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
- spark.udf().register("countTokens", new UDF1<WrappedArray, Integer>() {
+ spark.udf().register("countTokens", new UDF1<WrappedArray<String>, Integer>() {
@Override
- public Integer call(WrappedArray words) {
+ public Integer call(WrappedArray<String> words) {
return words.size();
}
}, DataTypes.IntegerType);
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
index 1860594e8e..b687fae5a1 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java
@@ -224,7 +224,7 @@ public class JavaSQLDataSourceExample {
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
- Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
+ Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// | address|name|
diff --git a/external/java8-tests/README.md b/external/java8-tests/README.md
deleted file mode 100644
index aa87901695..0000000000
--- a/external/java8-tests/README.md
+++ /dev/null
@@ -1,22 +0,0 @@
-# Java 8 Test Suites
-
-These tests require having Java 8 installed and are isolated from the main Spark build.
-If Java 8 is not your system's default Java version, you will need to point Spark's build
-to your Java location. The set-up depends a bit on the build system:
-
-* Sbt users can either set JAVA_HOME to the location of a Java 8 JDK or explicitly pass
- `-java-home` to the sbt launch script. If a Java 8 JDK is detected sbt will automatically
- include the Java 8 test project.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ build/sbt clean java8-tests/test
-
-* For Maven users,
-
- Maven users can also refer to their Java 8 directory using JAVA_HOME.
-
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn clean install -DskipTests`
- `$ JAVA_HOME=/opt/jdk1.8.0/ mvn -pl :java8-tests_2.11 test`
-
- Note that the above command can only be run from project root directory since this module
- depends on core and the test-jars of core and streaming. This means an install step is
- required to make the test dependencies visible to the Java 8 sub-project.
diff --git a/external/java8-tests/pom.xml b/external/java8-tests/pom.xml
deleted file mode 100644
index 8fc46d7af2..0000000000
--- a/external/java8-tests/pom.xml
+++ /dev/null
@@ -1,132 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-~ Licensed to the Apache Software Foundation (ASF) under one or more
-~ contributor license agreements. See the NOTICE file distributed with
-~ this work for additional information regarding copyright ownership.
-~ The ASF licenses this file to You under the Apache License, Version 2.0
-~ (the "License"); you may not use this file except in compliance with
-~ the License. You may obtain a copy of the License at
-~
-~ http://www.apache.org/licenses/LICENSE-2.0
-~
-~ Unless required by applicable law or agreed to in writing, software
-~ distributed under the License is distributed on an "AS IS" BASIS,
-~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-~ See the License for the specific language governing permissions and
-~ limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-parent_2.11</artifactId>
- <version>2.2.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
-
- <artifactId>java8-tests_2.11</artifactId>
- <packaging>pom</packaging>
- <name>Spark Project Java 8 Tests</name>
-
- <properties>
- <sbt.project.name>java8-tests</sbt.project.name>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-tags_${scala.binary.version}</artifactId>
- </dependency>
-
- <!--
- This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
- them will yield errors.
- -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-tags_${scala.binary.version}</artifactId>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-install-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <forceJavacCompilerUse>true</forceJavacCompilerUse>
- <source>1.8</source>
- <target>1.8</target>
- <compilerVersion>1.8</compilerVersion>
- </configuration>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <configuration>
- <useZincServer>${useZincForJdk8}</useZincServer>
- <javacArgs>
- <javacArg>-source</javacArg>
- <javacArg>1.8</javacArg>
- <javacArg>-target</javacArg>
- <javacArg>1.8</javacArg>
- <javacArg>-Xlint:all,-serial,-path</javacArg>
- </javacArgs>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
diff --git a/external/java8-tests/src/test/resources/log4j.properties b/external/java8-tests/src/test/resources/log4j.properties
deleted file mode 100644
index 3706a6e361..0000000000
--- a/external/java8-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set everything to be logged to the file target/unit-tests.log
-log4j.rootCategory=INFO, file
-log4j.appender.file=org.apache.log4j.FileAppender
-log4j.appender.file.append=true
-log4j.appender.file.file=target/unit-tests.log
-log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark_project.jetty=WARN
diff --git a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala b/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala
deleted file mode 100644
index c4042e47e8..0000000000
--- a/external/java8-tests/src/test/scala/test/org/apache/spark/java8/JDK8ScalaSuite.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package test.org.apache.spark.java8
-
-import org.apache.spark.SharedSparkContext
-import org.apache.spark.SparkFunSuite
-
-/**
- * Test cases where JDK8-compiled Scala user code is used with Spark.
- */
-class JDK8ScalaSuite extends SparkFunSuite with SharedSparkContext {
- test("basic RDD closure test (SPARK-6152)") {
- sc.parallelize(1 to 1000).map(x => x * x).count()
- }
-}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 02b23111af..9c5dceca2d 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -259,7 +259,7 @@ private[kafka010] class KafkaSource(
val preferredLoc = if (numExecutors > 0) {
// This allows cached KafkaConsumers in the executors to be re-used to read the same
// partition in every batch.
- Some(sortedExecutors(floorMod(tp.hashCode, numExecutors)))
+ Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
} else None
KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
}.filter { range =>
@@ -347,5 +347,4 @@ private[kafka010] object KafkaSource {
if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
}
- def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index bf8adbe42f..4c6e2ce87e 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -145,11 +145,6 @@ private[spark] class KafkaRDD[K, V](
a.host > b.host
}
- /**
- * Non-negative modulus, from java 8 math
- */
- private def floorMod(a: Int, b: Int): Int = ((a % b) + b) % b
-
override def getPreferredLocations(thePart: Partition): Seq[String] = {
// The intention is best-effort consistent executor for a given topicpartition,
// so that caching consumers can be effective.
@@ -164,7 +159,7 @@ private[spark] class KafkaRDD[K, V](
Seq()
} else {
// execs is sorted, tp.hashCode depends only on topic and partition, so consistent index
- val index = this.floorMod(tp.hashCode, execs.length)
+ val index = Math.floorMod(tp.hashCode, execs.length)
val chosen = execs(index)
Seq(chosen.toString)
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
index 0622fef17c..bc8d6037a3 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -104,15 +104,12 @@ abstract class AbstractCommandBuilder {
// Load extra JAVA_OPTS from conf/java-opts, if it exists.
File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
if (javaOpts.isFile()) {
- BufferedReader br = new BufferedReader(new InputStreamReader(
- new FileInputStream(javaOpts), StandardCharsets.UTF_8));
- try {
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(
+ new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
addOptionString(cmd, line);
}
- } finally {
- br.close();
}
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index c0779e1c4e..12bf29d3b1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -18,7 +18,6 @@
package org.apache.spark.launcher;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
@@ -103,14 +102,7 @@ class ChildProcAppHandle implements SparkAppHandle {
try {
childProc.exitValue();
} catch (IllegalThreadStateException e) {
- // Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
- // fall back to the old API if it's not there.
- try {
- Method destroy = childProc.getClass().getMethod("destroyForcibly");
- destroy.invoke(childProc);
- } catch (Exception inner) {
- childProc.destroy();
- }
+ childProc.destroyForcibly();
} finally {
childProc = null;
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
index 250b2a882f..e14c8aa47d 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -313,27 +313,6 @@ class CommandBuilderUtils {
}
/**
- * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
- * set it.
- */
- static void addPermGenSizeOpt(List<String> cmd) {
- // Don't set MaxPermSize for IBM Java, or Oracle Java 8 and later.
- if (getJavaVendor() == JavaVendor.IBM) {
- return;
- }
- if (javaMajorVersion(System.getProperty("java.version")) > 7) {
- return;
- }
- for (String arg : cmd) {
- if (arg.contains("-XX:MaxPermSize=")) {
- return;
- }
- }
-
- cmd.add("-XX:MaxPermSize=256m");
- }
-
- /**
* Get the major version of the java version string supplied. This method
* accepts any JEP-223-compliant strings (9-ea, 9+100), as well as legacy
* version strings such as 1.7.0_79
diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index ae43f563e8..865d4926da 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -137,12 +137,7 @@ class LauncherServer implements Closeable {
this.server = server;
this.running = true;
- this.serverThread = factory.newThread(new Runnable() {
- @Override
- public void run() {
- acceptConnections();
- }
- });
+ this.serverThread = factory.newThread(this::acceptConnections);
serverThread.start();
} catch (IOException ioe) {
close();
diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
index c7959aee9f..ff8045390c 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java
@@ -44,12 +44,7 @@ class OutputRedirector {
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
- this.thread = tf.newThread(new Runnable() {
- @Override
- public void run() {
- redirect();
- }
- });
+ this.thread = tf.newThread(this::redirect);
this.sink = Logger.getLogger(loggerName);
thread.start();
}
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 0aa7bd197d..cefb4d1a95 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -91,9 +91,6 @@ public interface SparkAppHandle {
* Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
* a {@link #stop()} message to the application, so it's recommended that users first try to
* stop the application cleanly and only resort to this method if that fails.
- * <p>
- * Note that if the application is running as a child process, this method fail to kill the
- * process when using Java 7. This may happen if, for example, the application is deadlocked.
*/
void kill();
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index 82b593a3f7..81786841de 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -49,35 +49,44 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
// Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use
// SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
- if (className.equals("org.apache.spark.deploy.master.Master")) {
- javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
- javaOptsKeys.add("SPARK_MASTER_OPTS");
- memKey = "SPARK_DAEMON_MEMORY";
- } else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
- javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
- javaOptsKeys.add("SPARK_WORKER_OPTS");
- memKey = "SPARK_DAEMON_MEMORY";
- } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
- javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
- javaOptsKeys.add("SPARK_HISTORY_OPTS");
- memKey = "SPARK_DAEMON_MEMORY";
- } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
- javaOptsKeys.add("SPARK_JAVA_OPTS");
- javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
- memKey = "SPARK_EXECUTOR_MEMORY";
- } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
- javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
- memKey = "SPARK_EXECUTOR_MEMORY";
- } else if (className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) {
- javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
- } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
- className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
- javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
- javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
- memKey = "SPARK_DAEMON_MEMORY";
- } else {
- javaOptsKeys.add("SPARK_JAVA_OPTS");
- memKey = "SPARK_DRIVER_MEMORY";
+ switch (className) {
+ case "org.apache.spark.deploy.master.Master":
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_MASTER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ break;
+ case "org.apache.spark.deploy.worker.Worker":
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_WORKER_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ break;
+ case "org.apache.spark.deploy.history.HistoryServer":
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_HISTORY_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ break;
+ case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ break;
+ case "org.apache.spark.executor.MesosExecutorBackend":
+ javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+ memKey = "SPARK_EXECUTOR_MEMORY";
+ break;
+ case "org.apache.spark.deploy.mesos.MesosClusterDispatcher":
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ break;
+ case "org.apache.spark.deploy.ExternalShuffleService":
+ case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
+ javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+ javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
+ memKey = "SPARK_DAEMON_MEMORY";
+ break;
+ default:
+ javaOptsKeys.add("SPARK_JAVA_OPTS");
+ memKey = "SPARK_DRIVER_MEMORY";
+ break;
}
List<String> cmd = buildJavaCommand(extraClassPath);
@@ -94,7 +103,6 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
cmd.add("-Xmx" + mem);
- addPermGenSizeOpt(cmd);
cmd.add(className);
cmd.addAll(classArgs);
return cmd;
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
index 29c6d82cdb..5e64fa7ed1 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -271,7 +271,6 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
}
- addPermGenSizeOpt(cmd);
cmd.add("org.apache.spark.deploy.SparkSubmit");
cmd.addAll(buildSparkSubmitArgs());
return cmd;
@@ -405,49 +404,65 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
@Override
protected boolean handle(String opt, String value) {
- if (opt.equals(MASTER)) {
- master = value;
- } else if (opt.equals(DEPLOY_MODE)) {
- deployMode = value;
- } else if (opt.equals(PROPERTIES_FILE)) {
- propertiesFile = value;
- } else if (opt.equals(DRIVER_MEMORY)) {
- conf.put(SparkLauncher.DRIVER_MEMORY, value);
- } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
- conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
- } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
- conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
- } else if (opt.equals(DRIVER_CLASS_PATH)) {
- conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
- } else if (opt.equals(CONF)) {
- String[] setConf = value.split("=", 2);
- checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
- conf.put(setConf[0], setConf[1]);
- } else if (opt.equals(CLASS)) {
- // The special classes require some special command line handling, since they allow
- // mixing spark-submit arguments with arguments that should be propagated to the shell
- // itself. Note that for this to work, the "--class" argument must come before any
- // non-spark-submit arguments.
- mainClass = value;
- if (specialClasses.containsKey(value)) {
- allowsMixedArguments = true;
- appResource = specialClasses.get(value);
- }
- } else if (opt.equals(KILL_SUBMISSION) || opt.equals(STATUS)) {
- isAppResourceReq = false;
- sparkArgs.add(opt);
- sparkArgs.add(value);
- } else if (opt.equals(HELP) || opt.equals(USAGE_ERROR)) {
- isAppResourceReq = false;
- sparkArgs.add(opt);
- } else if (opt.equals(VERSION)) {
- isAppResourceReq = false;
- sparkArgs.add(opt);
- } else {
- sparkArgs.add(opt);
- if (value != null) {
+ switch (opt) {
+ case MASTER:
+ master = value;
+ break;
+ case DEPLOY_MODE:
+ deployMode = value;
+ break;
+ case PROPERTIES_FILE:
+ propertiesFile = value;
+ break;
+ case DRIVER_MEMORY:
+ conf.put(SparkLauncher.DRIVER_MEMORY, value);
+ break;
+ case DRIVER_JAVA_OPTIONS:
+ conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+ break;
+ case DRIVER_LIBRARY_PATH:
+ conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+ break;
+ case DRIVER_CLASS_PATH:
+ conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+ break;
+ case CONF:
+ String[] setConf = value.split("=", 2);
+ checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
+ conf.put(setConf[0], setConf[1]);
+ break;
+ case CLASS:
+ // The special classes require some special command line handling, since they allow
+ // mixing spark-submit arguments with arguments that should be propagated to the shell
+ // itself. Note that for this to work, the "--class" argument must come before any
+ // non-spark-submit arguments.
+ mainClass = value;
+ if (specialClasses.containsKey(value)) {
+ allowsMixedArguments = true;
+ appResource = specialClasses.get(value);
+ }
+ break;
+ case KILL_SUBMISSION:
+ case STATUS:
+ isAppResourceReq = false;
+ sparkArgs.add(opt);
sparkArgs.add(value);
- }
+ break;
+ case HELP:
+ case USAGE_ERROR:
+ isAppResourceReq = false;
+ sparkArgs.add(opt);
+ break;
+ case VERSION:
+ isAppResourceReq = false;
+ sparkArgs.add(opt);
+ break;
+ default:
+ sparkArgs.add(opt);
+ if (value != null) {
+ sparkArgs.add(value);
+ }
+ break;
}
return true;
}
diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
index caeeea5ec6..9795041233 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -99,42 +99,6 @@ public class CommandBuilderUtilsSuite {
assertEquals(10, javaMajorVersion("10"));
}
- @Test
- public void testAddPermGenSizeOpt() {
- List<String> cmd = new ArrayList<>();
-
- if (javaMajorVersion(System.getProperty("java.version")) > 7) {
- // Does nothing in Java 8
- addPermGenSizeOpt(cmd);
- assertEquals(0, cmd.size());
- cmd.clear();
-
- } else {
- addPermGenSizeOpt(cmd);
- assertEquals(1, cmd.size());
- assertTrue(cmd.get(0).startsWith("-XX:MaxPermSize="));
- cmd.clear();
-
- cmd.add("foo");
- addPermGenSizeOpt(cmd);
- assertEquals(2, cmd.size());
- assertTrue(cmd.get(1).startsWith("-XX:MaxPermSize="));
- cmd.clear();
-
- cmd.add("-XX:MaxPermSize=512m");
- addPermGenSizeOpt(cmd);
- assertEquals(1, cmd.size());
- assertEquals("-XX:MaxPermSize=512m", cmd.get(0));
- cmd.clear();
-
- cmd.add("'-XX:MaxPermSize=512m'");
- addPermGenSizeOpt(cmd);
- assertEquals(1, cmd.size());
- assertEquals("'-XX:MaxPermSize=512m'", cmd.get(0));
- cmd.clear();
- }
- }
-
private static void testOpt(String opts, List<String> expected) {
assertEquals(String.format("test string failed to parse: [[ %s ]]", opts),
expected, parseOptionString(opts));
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
index ad2e7a70c4..d569b6688d 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -233,7 +233,7 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath());
launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
- launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m");
+ launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver");
launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
} else {
launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home")
@@ -258,12 +258,6 @@ public class SparkSubmitCommandBuilderSuite extends BaseSuite {
assertFalse("Memory arguments should not be set.", found);
}
- for (String arg : cmd) {
- if (arg.startsWith("-XX:MaxPermSize=")) {
- assertEquals("-XX:MaxPermSize=256m", arg);
- }
- }
-
String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
if (isDriver) {
assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
diff --git a/launcher/src/test/resources/spark-defaults.conf b/launcher/src/test/resources/spark-defaults.conf
index 239fc57883..3a51208c7c 100644
--- a/launcher/src/test/resources/spark-defaults.conf
+++ b/launcher/src/test/resources/spark-defaults.conf
@@ -17,5 +17,5 @@
spark.driver.memory=1g
spark.driver.extraClassPath=/driver
-spark.driver.extraJavaOptions=-Ddriver -XX:MaxPermSize=256m
+spark.driver.extraJavaOptions=-Ddriver
spark.driver.extraLibraryPath=/native \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index ac61a57a61..60e4c7269e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.7</java.version>
+ <java.version>1.8</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<slf4j.version>1.7.16</slf4j.version>
@@ -186,9 +186,6 @@
<test.java.home>${java.home}</test.java.home>
<test.exclude.tags></test.exclude.tags>
- <!-- When using different JDKs for the build, we can't use Zinc for the jdk8 part. -->
- <useZincForJdk8>true</useZincForJdk8>
-
<!-- Package to use when relocating shaded classes. -->
<spark.shade.packageName>org.spark_project</spark.shade.packageName>
@@ -219,8 +216,6 @@
-->
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
- <PermGen>64m</PermGen>
- <MaxPermGen>512m</MaxPermGen>
<CodeCacheSize>512m</CodeCacheSize>
</properties>
<repositories>
@@ -1920,7 +1915,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
- <version>1.12</version>
+ <version>3.0.0</version>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
@@ -1967,8 +1962,6 @@
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
- <jvmArg>-XX:PermSize=${PermGen}</jvmArg>
- <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
</jvmArgs>
<javacArgs>
@@ -1983,7 +1976,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>3.6.0</version>
+ <version>3.6.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
@@ -2014,7 +2007,7 @@
<include>**/*Suite.java</include>
</includes>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <argLine>-Xmx3g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+ <argLine>-Xmx3g -Xss4096k -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<environmentVariables>
<!--
Setting SPARK_DIST_CLASSPATH is a simple way to make sure any child processes
@@ -2063,7 +2056,7 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
- <argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
+ <argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
<stderr/>
<environmentVariables>
<!--
@@ -2149,6 +2142,41 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
+ <configuration>
+ <additionalparam>-Xdoclint:all -Xdoclint:-missing</additionalparam>
+ <tags>
+ <tag>
+ <name>example</name>
+ <placement>a</placement>
+ <head>Example:</head>
+ </tag>
+ <tag>
+ <name>note</name>
+ <placement>a</placement>
+ <head>Note:</head>
+ </tag>
+ <tag>
+ <name>group</name>
+ <placement>X</placement>
+ </tag>
+ <tag>
+ <name>tparam</name>
+ <placement>X</placement>
+ </tag>
+ <tag>
+ <name>constructor</name>
+ <placement>X</placement>
+ </tag>
+ <tag>
+ <name>todo</name>
+ <placement>X</placement>
+ </tag>
+ <tag>
+ <name>groupname</name>
+ <placement>X</placement>
+ </tag>
+ </tags>
+ </configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
@@ -2163,7 +2191,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
- <version>2.4.3</version>
+ <version>3.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -2178,6 +2206,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
+ <version>3.0.0</version>
<executions>
<execution>
<id>default-cli</id>
@@ -2252,7 +2281,6 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
- <version>2.10</version>
<executions>
<execution>
<id>generate-test-classpath</id>
@@ -2474,67 +2502,6 @@
</profile>
<profile>
- <id>java8-tests</id>
- <activation>
- <jdk>[1.8,)</jdk>
- </activation>
- <modules>
- <module>external/java8-tests</module>
- </modules>
- </profile>
-
- <profile>
- <id>doclint-java8-disable</id>
- <activation>
- <jdk>[1.8,)</jdk>
- </activation>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <additionalparam>-Xdoclint:all -Xdoclint:-missing</additionalparam>
- <tags>
- <tag>
- <name>example</name>
- <placement>a</placement>
- <head>Example:</head>
- </tag>
- <tag>
- <name>note</name>
- <placement>a</placement>
- <head>Note:</head>
- </tag>
- <tag>
- <name>group</name>
- <placement>X</placement>
- </tag>
- <tag>
- <name>tparam</name>
- <placement>X</placement>
- </tag>
- <tag>
- <name>constructor</name>
- <placement>X</placement>
- </tag>
- <tag>
- <name>todo</name>
- <placement>X</placement>
- </tag>
- <tag>
- <name>groupname</name>
- <placement>X</placement>
- </tag>
- </tags>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
-
- <profile>
<id>docker-integration-tests</id>
<modules>
<module>external/docker-integration-tests</module>
@@ -2630,60 +2597,6 @@
</profile>
<profile>
- <id>java7</id>
- <activation>
- <property><name>env.JAVA_7_HOME</name></property>
- </activation>
- <properties>
- <useZincForJdk8>false</useZincForJdk8>
- </properties>
- <build>
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <compilerArgs combine.children="append">
- <arg>-bootclasspath</arg>
- <arg>${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar</arg>
- </compilerArgs>
- <verbose>true</verbose>
- </configuration>
- </plugin>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <!-- Note: -javabootclasspath is set on a per-execution basis rather than as a
- plugin-wide configuration because doc-jar generation will break if it's
- set; see SPARK-15839 for more details -->
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <configuration>
- <args combine.children="append">
- <arg>-javabootclasspath</arg>
- <arg>${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar</arg>
- </args>
- </configuration>
- </execution>
- <execution>
- <id>scala-test-compile-first</id>
- <configuration>
- <args combine.children="append">
- <arg>-javabootclasspath</arg>
- <arg>${env.JAVA_7_HOME}/jre/lib/rt.jar${path.separator}${env.JAVA_7_HOME}/jre/lib/jce.jar</arg>
- </args>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </pluginManagement>
- </build>
- </profile>
-
- <profile>
<id>scala-2.11</id>
<activation>
<property><name>!scala-2.10</name></property>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index bcc00fa3e9..b48879faa4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -56,9 +56,9 @@ object BuildCommons {
"tags", "sketch"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
- val optionallyEnabledProjects@Seq(mesos, yarn, java8Tests, sparkGangliaLgpl,
+ val optionallyEnabledProjects@Seq(mesos, yarn, sparkGangliaLgpl,
streamingKinesisAsl, dockerIntegrationTests) =
- Seq("mesos", "yarn", "java8-tests", "ganglia-lgpl", "streaming-kinesis-asl",
+ Seq("mesos", "yarn", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests").map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
@@ -233,8 +233,8 @@ object SparkBuild extends PomBuild {
if (major >= 8) Seq("-Xdoclint:all", "-Xdoclint:-missing") else Seq.empty
},
- javacJVMVersion := "1.7",
- scalacJVMVersion := "1.7",
+ javacJVMVersion := "1.8",
+ scalacJVMVersion := "1.8",
javacOptions in Compile ++= Seq(
"-encoding", "UTF-8",
@@ -245,24 +245,12 @@ object SparkBuild extends PomBuild {
// additional discussion and explanation.
javacOptions in (Compile, compile) ++= Seq(
"-target", javacJVMVersion.value
- ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 =>
- if (javacJVMVersion.value == "1.7") {
- Seq("-bootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar")
- } else {
- Nil
- }
- },
+ ),
scalacOptions in Compile ++= Seq(
s"-target:jvm-${scalacJVMVersion.value}",
"-sourcepath", (baseDirectory in ThisBuild).value.getAbsolutePath // Required for relative source links in scaladoc
- ) ++ sys.env.get("JAVA_7_HOME").toSeq.flatMap { jdk7 =>
- if (javacJVMVersion.value == "1.7") {
- Seq("-javabootclasspath", s"$jdk7/jre/lib/rt.jar${File.pathSeparator}$jdk7/jre/lib/jce.jar")
- } else {
- Nil
- }
- },
+ ),
// Implements -Xfatal-warnings, ignoring deprecation warnings.
// Code snippet taken from https://issues.scala-lang.org/browse/SI-8410.
@@ -363,8 +351,6 @@ object SparkBuild extends PomBuild {
enable(Flume.settings)(streamingFlumeSink)
- enable(Java8TestSettings.settings)(java8Tests)
-
// SPARK-14738 - Remove docker tests from main Spark build
// enable(DockerIntegrationTests.settings)(dockerIntegrationTests)
@@ -387,7 +373,7 @@ object SparkBuild extends PomBuild {
fork := true,
outputStrategy in run := Some (StdoutOutput),
- javaOptions ++= Seq("-Xmx2G", "-XX:MaxPermSize=256m"),
+ javaOptions += "-Xmx2g",
sparkShell := {
(runMain in Compile).toTask(" org.apache.spark.repl.Main -usejavacp").value
@@ -531,7 +517,6 @@ object SQL {
object Hive {
lazy val settings = Seq(
- javaOptions += "-XX:MaxPermSize=256m",
// Specially disable assertions since some Hive tests fail them
javaOptions in Test := (javaOptions in Test).value.filterNot(_ == "-ea"),
// Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
@@ -765,16 +750,6 @@ object CopyDependencies {
}
-object Java8TestSettings {
- import BuildCommons._
-
- lazy val settings = Seq(
- javacJVMVersion := "1.8",
- // Targeting Java 8 bytecode is only supported in Scala 2.11.4 and higher:
- scalacJVMVersion := (if (System.getProperty("scala-2.10") == "true") "1.7" else "1.8")
- )
-}
-
object TestSettings {
import BuildCommons._
@@ -812,7 +787,7 @@ object TestSettings {
javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark"))
.map { case (k,v) => s"-D$k=$v" }.toSeq,
javaOptions in Test += "-ea",
- javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
+ javaOptions in Test ++= "-Xmx3g -Xss4096k"
.split(" ").toSeq,
javaOptions += "-Xmx3g",
// Exclude tags defined in a system property
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index f4f4518480..a00234c2b4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -911,7 +911,6 @@ private[spark] class Client(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
- YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts)
val userClass =
if (isClusterMode) {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b55b4b147b..ee85c043b8 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.Utils
@@ -190,7 +189,6 @@ private[yarn] class ExecutorRunnable(
// For log4j configuration to reference
javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR)
- YarnCommandBuilderUtils.addPermGenSizeOpt(javaOpts)
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
index 6c3556a2ee..0c3d080cca 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/launcher/YarnCommandBuilderUtils.scala
@@ -38,16 +38,4 @@ private[spark] object YarnCommandBuilderUtils {
CommandBuilderUtils.findJarsDir(sparkHome, scalaVer, true)
}
- /**
- * Adds the perm gen configuration to the list of java options if needed and not yet added.
- *
- * Note that this method adds the option based on the local JVM version; if the node where
- * the container is running has a different Java version, there's a risk that the option will
- * not be added (e.g. if the AM is running Java 8 but the container's node is set up to use
- * Java 7).
- */
- def addPermGenSizeOpt(args: ListBuffer[String]): Unit = {
- CommandBuilderUtils.addPermGenSizeOpt(args.asJava)
- }
-
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 465fb83669..089c84d5f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -134,12 +134,8 @@ final class Decimal extends Ordered[Decimal] with Serializable {
* Set this Decimal to the given BigInteger value. Will have precision 38 and scale 0.
*/
def set(bigintval: BigInteger): Decimal = {
- // TODO: Remove this once we migrate to java8 and use longValueExact() instead.
- require(
- bigintval.compareTo(LONG_MAX_BIG_INT) <= 0 && bigintval.compareTo(LONG_MIN_BIG_INT) >= 0,
- s"BigInteger $bigintval too large for decimal")
this.decimalVal = null
- this.longVal = bigintval.longValue()
+ this.longVal = bigintval.longValueExact()
this._precision = DecimalType.MAX_PRECISION
this._scale = 0
this
@@ -178,7 +174,7 @@ final class Decimal extends Ordered[Decimal] with Serializable {
def toUnscaledLong: Long = {
if (decimalVal.ne(null)) {
- decimalVal.underlying().unscaledValue().longValue()
+ decimalVal.underlying().unscaledValue().longValueExact()
} else {
longVal
}
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
index 2570c8d02a..d44af7ef48 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java
@@ -22,13 +22,13 @@ import java.util.Iterator;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.KeyedState;
/**
* ::Experimental::
* Base interface for a map function used in
- * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(FlatMapGroupsWithStateFunction, Encoder, Encoder)}.
+ * {@link org.apache.spark.sql.KeyValueGroupedDataset#flatMapGroupsWithState(
+ * FlatMapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}.
* @since 2.1.1
*/
@Experimental
diff --git a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
index 614d3925e0..75986d1706 100644
--- a/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
+++ b/sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java
@@ -22,13 +22,13 @@ import java.util.Iterator;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.KeyedState;
/**
* ::Experimental::
* Base interface for a map function used in
- * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState(MapGroupsWithStateFunction, Encoder, Encoder)}
+ * {@link org.apache.spark.sql.KeyValueGroupedDataset#mapGroupsWithState(
+ * MapGroupsWithStateFunction, org.apache.spark.sql.Encoder, org.apache.spark.sql.Encoder)}
* @since 2.1.1
*/
@Experimental
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e62cd9f7bf..38a24cc8ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -103,7 +103,7 @@ private[sql] object Dataset {
* the following creates a new Dataset by applying a filter on the existing one:
* {{{
* val names = people.map(_.name) // in Scala; names is a Dataset[String]
- * Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // in Java 8
+ * Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING));
* }}}
*
* Dataset operations can also be untyped, through various domain-specific-language (DSL)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 94e689a4d5..3a548c251f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -98,7 +98,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* // Create Integer values grouped by String key from a Dataset<Tuple2<String, Integer>>
* Dataset<Tuple2<String, Integer>> ds = ...;
* KeyValueGroupedDataset<String, Integer> grouped =
- * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT()); // Java 8
+ * ds.groupByKey(t -> t._1, Encoders.STRING()).mapValues(t -> t._2, Encoders.INT());
* }}}
*
* @since 2.1.0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index ea465e2c83..dbe55090ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -190,17 +190,6 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* The following example registers a UDF in Java:
* {{{
* sqlContext.udf().register("myUDF",
- * new UDF2<Integer, String, String>() {
- * @Override
- * public String call(Integer arg1, String arg2) {
- * return arg2 + arg1;
- * }
- * }, DataTypes.StringType);
- * }}}
- *
- * Or, to use Java 8 lambda syntax:
- * {{{
- * sqlContext.udf().register("myUDF",
* (Integer arg1, String arg2) -> arg2 + arg1,
* DataTypes.StringType);
* }}}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e1fdb2f287..1975a56caf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -164,17 +164,6 @@ class SparkSession private(
* The following example registers a UDF in Java:
* {{{
* sparkSession.udf().register("myUDF",
- * new UDF2<Integer, String, String>() {
- * @Override
- * public String call(Integer arg1, String arg2) {
- * return arg2 + arg1;
- * }
- * }, DataTypes.StringType);
- * }}}
- *
- * Or, to use Java 8 lambda syntax:
- * {{{
- * sparkSession.udf().register("myUDF",
* (Integer arg1, String arg2) -> arg2 + arg1,
* DataTypes.StringType);
* }}}
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
index 10d25fa445..8b8a403e2b 100644
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/sql/Java8DatasetAggregatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package test.org.apache.spark.java8.sql;
+package test.org.apache.spark.sql;
import java.util.Arrays;
@@ -26,7 +26,6 @@ import scala.Tuple2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.KeyValueGroupedDataset;
import org.apache.spark.sql.expressions.javalang.typed;
-import test.org.apache.spark.sql.JavaDatasetAggregatorSuiteBase;
/**
* Suite that replicates tests in JavaDatasetAggregatorSuite using lambda syntax.
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 5ef4e887de..a94a37cb21 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -228,7 +228,7 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> mapped2 = grouped.mapGroupsWithState(
new MapGroupsWithStateFunction<Integer, String, Long, String>() {
@Override
- public String call(Integer key, Iterator<String> values, KeyedState<Long> s) throws Exception {
+ public String call(Integer key, Iterator<String> values, KeyedState<Long> s) {
StringBuilder sb = new StringBuilder(key.toString());
while (values.hasNext()) {
sb.append(values.next());
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 9aedaf234e..0f249d7d59 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -190,6 +190,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.0.0</version>
<executions>
<execution>
<id>add-scala-test-sources</id>
@@ -219,7 +220,7 @@
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<!-- Specially disable assertions since some Hive tests fail them -->
- <argLine>-da -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+ <argLine>-da -Xmx3g -XX:ReservedCodeCacheSize=${CodeCacheSize}</argLine>
</configuration>
</plugin>
<plugin>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index e7c165c5f8..d786a610f1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -137,21 +137,13 @@ case class ScriptTransformationExec(
throw writerThread.exception.get
}
- // Checks if the proc is still alive (incase the command ran was bad)
- // The ideal way to do this is to use Java 8's Process#isAlive()
- // but it cannot be used because Spark still supports Java 7.
- // Following is a workaround used to check if a process is alive in Java 7
- // TODO: Once builds are switched to Java 8, this can be changed
- try {
+ if (!proc.isAlive) {
val exitCode = proc.exitValue()
if (exitCode != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
throw new SparkException(s"Subprocess exited with status $exitCode. " +
s"Error: ${stderrBuffer.toString}", cause)
}
- } catch {
- case _: IllegalThreadStateException =>
- // This means that the process is still alive. Move ahead
}
}
diff --git a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 338ca54ab8..646cb97066 100644
--- a/external/java8-tests/src/test/java/test/org/apache/spark/java8/dstream/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -15,11 +15,17 @@
* limitations under the License.
*/
-package test.org.apache.spark.java8.dstream;
+package test.org.apache.spark.streaming;
import java.io.Serializable;
import java.util.*;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Durations;
+import org.apache.spark.streaming.JavaTestUtils;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.StateSpec;
+import org.apache.spark.streaming.Time;
import scala.Tuple2;
import com.google.common.collect.Lists;
@@ -32,7 +38,6 @@ import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
@@ -139,7 +144,7 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ (x, y) -> x - y, new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index 648a5abe0b..8d24104d78 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -15,13 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark.streaming;
+package test.org.apache.spark.streaming;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.JavaCheckpointTestUtils;
+import org.apache.spark.streaming.JavaTestUtils;
+import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.StreamingContextState;
+import org.apache.spark.streaming.StreamingContextSuite;
+import org.apache.spark.streaming.Time;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;