aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/pom.xml17
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala91
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala139
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala264
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala26
-rw-r--r--dev/deps/spark-deps-hadoop-2.25
-rw-r--r--dev/deps/spark-deps-hadoop-2.35
-rw-r--r--dev/deps/spark-deps-hadoop-2.45
-rw-r--r--dev/deps/spark-deps-hadoop-2.65
-rw-r--r--dev/deps/spark-deps-hadoop-2.75
-rw-r--r--docs/cluster-overview.md2
-rw-r--r--docs/configuration.md65
-rw-r--r--docs/security.md30
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala2
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala2
-rw-r--r--pom.xml5
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala2
43 files changed, 123 insertions, 831 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 2071a58de9..0ab170e028 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -186,19 +186,6 @@
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-testkit_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
@@ -225,6 +212,10 @@
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4628093b91..5a42299a0b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
- * workaround for the issue, which is ultimately caused by the way the BlockManager actors
- * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+ * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
+ * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
- * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+ * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
deleted file mode 100644
index 46f9f9e9af..0000000000
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import com.google.common.io.Files
-
-import org.apache.spark.util.Utils
-
-private[spark] class HttpFileServer(
- conf: SparkConf,
- securityManager: SecurityManager,
- requestedPort: Int = 0)
- extends Logging {
-
- var baseDir : File = null
- var fileDir : File = null
- var jarDir : File = null
- var httpServer : HttpServer = null
- var serverUri : String = null
-
- def initialize() {
- baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
- fileDir = new File(baseDir, "files")
- jarDir = new File(baseDir, "jars")
- fileDir.mkdir()
- jarDir.mkdir()
- logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
- httpServer.start()
- serverUri = httpServer.uri
- logDebug("HTTP file server started at: " + serverUri)
- }
-
- def stop() {
- httpServer.stop()
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs
- try {
- Utils.deleteRecursively(baseDir)
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
- }
- }
-
- def addFile(file: File) : String = {
- addFileToDir(file, fileDir)
- serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addJar(file: File) : String = {
- addFileToDir(file, jarDir)
- serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addDirectory(path: String, resourceBase: String): String = {
- httpServer.addDirectory(path, resourceBase)
- serverUri + path
- }
-
- def addFileToDir(file: File, dir: File) : String = {
- // Check whether the file is a directory. If it is, throw a more meaningful exception.
- // If we don't catch this, Guava throws a very confusing error message:
- // java.io.FileNotFoundException: [file] (No such file or directory)
- // even though the directory ([file]) exists.
- if (file.isDirectory) {
- throw new IllegalArgumentException(s"$file cannot be a directory.")
- }
- Files.copy(file, new File(dir, file.getName))
- dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b59beb8d6..eb2fdecc83 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
- val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
@@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint(
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
- if (serializedSize > maxAkkaFrameSize) {
+ if (serializedSize > maxRpcMessageSize) {
+
val msg = s"Map output statuses were $serializedSize bytes which " +
- s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
+ s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 261265f0b4..d755f07965 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -21,9 +21,6 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext
-import scala.collection.JavaConverters._
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
/**
@@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
- * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
- * authentication with SSL, SSLOptions cannot support it either.
+ * by the protocol, which it can generate the configuration for.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
@@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
}
}
- /**
- * Creates an Akka configuration object which contains all the SSL settings represented by this
- * object. It can be used then to compose the ultimate Akka configuration.
- */
- def createAkkaConfig: Option[Config] = {
- if (enabled) {
- if (keyStoreType.isDefined) {
- logWarning("Akka configuration does not support key store type.");
- }
- if (trustStoreType.isDefined) {
- logWarning("Akka configuration does not support trust store type.");
- }
-
- Some(ConfigFactory.empty()
- .withValue("akka.remote.netty.tcp.security.key-store",
- ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-store-password",
- ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store",
- ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store-password",
- ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-password",
- ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.random-number-generator",
- ConfigValueFactory.fromAnyRef(""))
- .withValue("akka.remote.netty.tcp.security.protocol",
- ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
- ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
- .withValue("akka.remote.netty.tcp.enable-ssl",
- ConfigValueFactory.fromAnyRef(true)))
- } else {
- None
- }
- }
-
/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c5aec05c03..0675957e16 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
- * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- * Akka remoting allows you to specify a secure cookie that will be exchanged
- * and ensured to be identical in the connection handshake between the client
- * and the server. If they are not identical then the client will be refused
- * to connect to the server. There is no control of the underlying
- * authentication mechanism so its not clear if the password is passed in
- * plaintext or uses DIGEST-MD5 or some other mechanism.
- *
- * Akka also has an option to turn on SSL, this option is currently supported (see
- * the details below).
- *
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
@@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
- * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- * Akka based connections or `fs` for broadcast and file server.
+ * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
+ * broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
- * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ * of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
+ * Jetty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 340e1f7824..36e240e618 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
- /** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] =
- /* This is currently undocumented. If we want to make this public we should consider
- * nesting options under the spark namespace to avoid conflicts with user akka options.
- * Otherwise users configuring their own akka code via system properties could mess up
- * spark's akka options.
- *
- * E.g. spark.akka.option.x.y.x = "value"
- */
- getAll.filter { case (k, _) => isAkkaConf(k) }
-
/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
@@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
- AlternateConfig("spark.unsafe.offHeap", "1.6"))
+ AlternateConfig("spark.unsafe.offHeap", "1.6")),
+ "spark.rpc.message.maxSize" -> Seq(
+ AlternateConfig("spark.akka.frameSize", "1.6"))
)
/**
@@ -616,20 +607,12 @@ private[spark] object SparkConf extends Logging {
}
/**
- * Return whether the given config is an akka config (e.g. akka.actor.provider).
- * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
- */
- def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
-
- /**
* Return whether the given config should be passed to an executor on start-up.
*
- * Certain akka and authentication configs are required from the executor when it connects to
+ * Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
- isAkkaConf(name) ||
- name.startsWith("spark.akka") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
@@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
+ return
}
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
+ return
+ }
+ if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
+ logWarning(
+ s"The configuration key $key is not supported any more " +
+ s"because Spark doesn't use Akka since 2.0")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ec43be0e2f..9461afdc54 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -23,7 +23,6 @@ import java.net.Socket
import scala.collection.mutable
import scala.util.Properties
-import akka.actor.ActorSystem
import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
@@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage._
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
@@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
- _actorSystem: ActorSystem, // TODO Remove actorSystem
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
@@ -71,10 +69,6 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
- // TODO Remove actorSystem
- @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
- val actorSystem: ActorSystem = _actorSystem
-
private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -96,13 +90,8 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
- actorSystem.shutdown()
rpcEnv.shutdown()
-
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- // actorSystem.awaitTermination()
+ rpcEnv.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
@@ -152,8 +141,8 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _
- private[spark] val driverActorSystemName = "sparkDriver"
- private[spark] val executorActorSystemName = "sparkExecutor"
+ private[spark] val driverSystemName = "sparkDriver"
+ private[spark] val executorSystemName = "sparkExecutor"
def set(e: SparkEnv) {
env = e
@@ -202,7 +191,7 @@ object SparkEnv extends Logging {
/**
* Create a SparkEnv for an executor.
- * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+ * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
@@ -245,28 +234,11 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
- val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
- // Create the ActorSystem for Akka and get the port it binds to.
- val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
+ val systemName = if (isDriver) driverSystemName else executorSystemName
+ val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
- val actorSystem: ActorSystem = {
- val actorSystemPort =
- if (port == 0 || rpcEnv.address == null) {
- port
- } else {
- rpcEnv.address.port + 1
- }
- // Create a ActorSystem for legacy codes
- AkkaUtils.createActorSystem(
- actorSystemName + "ActorSystem",
- hostname,
- actorSystemPort,
- conf,
- securityManager
- )._1
- }
- // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+ // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
@@ -325,7 +297,7 @@ object SparkEnv extends Logging {
new MapOutputTrackerWorker(conf)
}
- // Have to assign trackerActor after initialization as MapOutputTrackerActor
+ // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
@@ -398,7 +370,6 @@ object SparkEnv extends Logging {
val envInstance = new SparkEnv(
executorId,
rpcEnv,
- actorSystem,
serializer,
closureSerializer,
cacheManager,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 63a20ab41a..dcef03ef3e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -219,11 +219,7 @@ object Client {
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
- if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
- conf.set("spark.akka.logLifecycleEvents", "true")
- }
conf.set("spark.rpc.askTimeout", "10")
- conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a1e8da1ec8..a6749f7e38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -183,7 +183,7 @@ object SparkSubmit {
}
// In standalone cluster mode, there are two submission gateways:
- // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+ // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 58bd9ca3d1..e3a6c4c07a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
- hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
@@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[RegisterExecutorResponse](
- RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
+ ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
@@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
- // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
- // connections (e.g., if it's using akka). Otherwise, the executor is running in
- // client mode only, and does not accept incoming connections.
- val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
- hostname + ":" + port
- }.orNull
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
- env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
+ env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 75d7e34d60..030ae41db4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util._
* Spark executor, backed by a threadpool to run tasks.
*
* This can be used with Mesos, YARN, and the standalone scheduler.
- * An internal RPC interface (at the moment Akka) is used for communication with the driver,
+ * An internal RPC interface is used for communication with the driver,
* except in the case of Mesos fine-grained mode.
*/
private[spark] class Executor(
@@ -97,9 +97,9 @@ private[spark] class Executor(
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
- // Akka's message frame size. If task result is bigger than this, we use the block manager
+ // Max RPC message size. If task result is bigger than this, we use the block manager
// to send the result back.
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -263,7 +263,7 @@ private[spark] class Executor(
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
- } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ } else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index f3d0d85476..29e469c3f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
- hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b808993aa6..f69a3d371e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
/**
* A scheduler backend that waits for coarse-grained executors to connect.
@@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Total number of executors that are currently registered
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
@@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
+ case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
@@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
- if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
- "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
- "spark.akka.frameSize or using broadcast variables for large values."
- msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
- AkkaUtils.reservedSizeBytes)
+ "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+ "spark.rpc.message.maxSize or using broadcast variables for large values."
+ msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0a6f2c01c1..a298cf5ef9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend(
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
logInfo("Writing to HDFS file: " + driverFilePath)
- logInfo("Writing Akka address: " + driverUrl)
+ logInfo("Writing Driver address: " + driverUrl)
logInfo("Writing Spark UI Address: " + appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b2f1..0000000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
- /**
- * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
- * ActorSystem itself and its port (which is hard to get from Akka).
- *
- * Note: the `name` parameter is important, as even if a client sends a message to right
- * host + port, if the system name is incorrect, Akka will drop the message.
- *
- * If indestructible is set to true, the Actor System will continue running in the event
- * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
- */
- def createActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
- val startService: Int => (ActorSystem, Int) = { actualPort =>
- doCreateActorSystem(name, host, actualPort, conf, securityManager)
- }
- Utils.startServiceOnPort(port, startService, conf, name)
- }
-
- private def doCreateActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
-
- val akkaThreads = conf.getInt("spark.akka.threads", 4)
- val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
- conf.get("spark.network.timeout", "120s"))
- val akkaFrameSize = maxFrameSizeBytes(conf)
- val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
- val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
- if (!akkaLogLifecycleEvents) {
- // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
- // See: https://www.assembla.com/spaces/akka/tickets/3787#/
- Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
- }
-
- val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
- val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
- val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
- val secretKey = securityManager.getSecretKey()
- val isAuthOn = securityManager.isAuthenticationEnabled()
- if (isAuthOn && secretKey == null) {
- throw new Exception("Secret key is null with authentication on")
- }
- val requireCookie = if (isAuthOn) "on" else "off"
- val secureCookie = if (isAuthOn) secretKey else ""
- logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
- val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
- .getOrElse(ConfigFactory.empty())
-
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
- .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
- s"""
- |akka.daemonic = on
- |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
- |akka.stdout-loglevel = "ERROR"
- |akka.jvm-exit-on-fatal-error = off
- |akka.remote.require-cookie = "$requireCookie"
- |akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
- |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
- |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
- |akka.actor.default-dispatcher.throughput = $akkaBatchSize
- |akka.log-config-on-start = $logAkkaConfig
- |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
- |akka.log-dead-letters = $lifecycleEvents
- |akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin))
-
- val actorSystem = ActorSystem(name, akkaConf)
- val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.getDefaultAddress.port.get
- (actorSystem, boundPort)
- }
-
- private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
- /** Returns the configured max frame size for Akka messages in bytes. */
- def maxFrameSizeBytes(conf: SparkConf): Int = {
- val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
- if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
- throw new IllegalArgumentException(
- s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
- }
- frameSizeInMB * 1024 * 1024
- }
-
- /** Space reserved for extra data in an Akka message besides serialized task or task result. */
- val reservedSizeBytes = 200 * 1024
-
-}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index b68936f5c9..2bb8de568e 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -53,4 +53,16 @@ private[spark] object RpcUtils {
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
+
+ private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
+
+ /** Returns the configured max message size for messages in bytes. */
+ def maxMessageSizeBytes(conf: SparkConf): Int = {
+ val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+ if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
+ throw new IllegalArgumentException(
+ s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
+ }
+ maxSizeInMB * 1024 * 1024
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
deleted file mode 100644
index bc7059b77f..0000000000
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.URI
-import java.util.jar.{JarEntry, JarOutputStream}
-import javax.net.ssl.SSLException
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.commons.lang3.RandomUtils
-
-import org.apache.spark.util.Utils
-
-class FileServerSuite extends SparkFunSuite with LocalSparkContext {
-
- import SSLSampleConfigs._
-
- @transient var tmpDir: File = _
- @transient var tmpFile: File = _
- @transient var tmpJarUrl: String = _
-
- def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
-
- override def beforeEach() {
- super.beforeEach()
- resetSparkContext()
- }
-
- override def beforeAll() {
- super.beforeAll()
-
- tmpDir = Utils.createTempDir()
- val testTempDir = new File(tmpDir, "test")
- testTempDir.mkdir()
-
- val textFile = new File(testTempDir, "FileServerSuite.txt")
- val pw = new PrintWriter(textFile)
- // scalastyle:off println
- pw.println("100")
- // scalastyle:on println
- pw.close()
-
- val jarFile = new File(testTempDir, "test.jar")
- val jarStream = new FileOutputStream(jarFile)
- val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-
- val jarEntry = new JarEntry(textFile.getName)
- jar.putNextEntry(jarEntry)
-
- val in = new FileInputStream(textFile)
- ByteStreams.copy(in, jar)
-
- in.close()
- jar.close()
- jarStream.close()
-
- tmpFile = textFile
- tmpJarUrl = jarFile.toURI.toURL.toString
- }
-
- override def afterAll() {
- try {
- Utils.deleteRecursively(tmpDir)
- } finally {
- super.afterAll()
- }
- }
-
- test("Distributing files locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally security On") {
- val sparkConf = new SparkConf(false)
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
- sc = new SparkContext("local[4]", "test", sparkConf)
-
- sc.addFile(tmpFile.toString)
- assert(sc.env.securityManager.isAuthenticationEnabled() === true)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally using URL as input") {
- // addFile("file:///....")
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(new File(tmpFile.toString).toURI.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test("Distributing files on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("Dynamically adding JARS on a standalone cluster using local: URL") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl.replace("file", "local"))
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("HttpFileServer should work with SSL") {
- val sparkConf = sparkSSLConfig()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should work with SSL and good credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with valid SSL and bad credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "bad")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[IOException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with SSL when the server is untrusted") {
- val sparkConf = sparkSSLConfigUntrusted()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[SSLException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
- val randomContent = RandomUtils.nextBytes(100)
- val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
- Files.write(randomContent, file)
- server.addFile(file)
-
- val uri = new URI(server.serverUri + "/files/" + file.getName)
-
- val connection = if (sm != null && sm.isAuthenticationEnabled()) {
- Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
- } else {
- uri.toURL.openConnection()
- }
-
- if (sm != null) {
- Utils.setupSecureURLConnection(connection, sm)
- }
-
- val buf = ByteStreams.toByteArray(connection.getInputStream)
- assert(buf === randomContent)
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e5350840..c7f629a14b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -175,9 +175,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index e1a0bf7c93..24ec99c7e5 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -52,7 +52,7 @@ object LocalSparkContext {
if (sc != null) {
sc.stop()
}
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 3819c0a8f3..6546def596 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
slaveRpcEnv.shutdown()
}
- test("remote fetch below akka frame size") {
+ test("remote fetch below max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~123B, and no exception should be thrown
+ // Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
@@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
rpcEnv.shutdown()
}
- test("remote fetch exceeds akka frame size") {
+ test("remote fetch exceeds max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
+ // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 7603cef773..8bdb237c28 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
val securityManager = new SecurityManager(conf)
- val akkaSSLOptions = securityManager.getSSLOptions("akka")
assert(securityManager.fileServerSSLOptions.enabled === true)
- assert(akkaSSLOptions.enabled === true)
assert(securityManager.sslSocketFactory.isDefined === true)
assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
-
- assert(akkaSSLOptions.trustStore.isDefined === true)
- assert(akkaSSLOptions.trustStore.get.getName === "truststore")
- assert(akkaSSLOptions.keyStore.isDefined === true)
- assert(akkaSSLOptions.keyStore.get.getName === "keystore")
- assert(akkaSSLOptions.trustStorePassword === Some("password"))
- assert(akkaSSLOptions.keyStorePassword === Some("password"))
- assert(akkaSSLOptions.keyPassword === Some("password"))
- assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
- assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
}
test("ssl off setup") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 190e4dd728..9c13c15281 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils {
"publicAddress",
new File("sparkHome"),
new File("workDir"),
- "akka://worker",
+ "spark://worker",
new SparkConf,
Seq("localDir"),
ExecutorState.RUNNING)
@@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils {
new File("sparkHome"),
createDriverDesc(),
null,
- "akka://worker",
+ "spark://worker",
new SecurityManager(conf))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index ab3d4cafeb..fdada0777f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
- val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+ val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index bd8b0655f4..2a1696be36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite {
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
val conf = new SparkConf()
new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
+ driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 64e486d791..6f4eda8b47 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val e = intercept[Exception] {
Await.result(f, 1 seconds)
}
- assert(e.isInstanceOf[TimeoutException] || // For Akka
- e.isInstanceOf[NotSerializableException] // For Netty
- )
+ assert(e.isInstanceOf[NotSerializableException])
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70f40fb26c..04cccc67e3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -18,16 +18,16 @@
package org.apache.spark.scheduler
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
- test("serialized task larger than akka frame size") {
+ test("serialized task larger than max RPC message size") {
val conf = new SparkConf
- conf.set("spark.akka.frameSize", "1")
+ conf.set("spark.rpc.message.maxSize", "1")
conf.set("spark.default.parallelism", "1")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+ val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d89f..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("onTaskGettingResult() called when result fetched remotely") {
- val conf = new SparkConf().set("spark.akka.frameSize", "1")
+ val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
sc = new SparkContext("local", "SparkListenerSuite", conf)
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- assert(akkaFrameSize === 1024 * 1024)
+ // Make a task whose result is larger than the RPC message size
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ assert(maxRpcMessageSize === 1024 * 1024)
val result = sc.parallelize(Seq(1), 1)
- .map { x => 1.to(akkaFrameSize).toArray }
+ .map { x => 1.to(maxRpcMessageSize).toArray }
.reduce { case (x, y) => x }
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
+ // Make a task whose result is larger than the RPC message size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index bc72c3685e..cc2557c2f1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.TestUtils.JavaSourceFromString
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
*/
class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
- def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
+ def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
- test("handling results smaller than Akka frame size") {
+ test("handling results smaller than max RPC message size") {
sc = new SparkContext("local", "test", conf)
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
- test("handling results larger than Akka frame size") {
+ test("handling results larger than max RPC message size") {
sc = new SparkContext("local", "test", conf)
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
- assert(result === 1.to(akkaFrameSize).toArray)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
val RESULT_BLOCK_ID = TaskResultBlockId(0)
assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
@@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
}
val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
scheduler.taskResultGetter = resultGetter
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
assert(resultGetter.removeBlockSuccessfully)
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 0760529b57..4d9937c5cb 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
RoaringBitmap-0.5.11.jar
ST4-4.0.4.jar
activation-1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
apache-log4j-extras-1.2.17.jar
@@ -44,7 +41,6 @@ commons-math3-3.4.1.jar
commons-net-2.2.jar
commons-pool-1.5.4.jar
compress-lzf-1.0.3.jar
-config-1.2.1.jar
core-1.1.2.jar
curator-client-2.4.0.jar
curator-framework-2.4.0.jar
@@ -179,7 +175,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 191f2a0e4e..fd659ee20d 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
RoaringBitmap-0.5.11.jar
ST4-4.0.4.jar
activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
commons-net-2.2.jar
commons-pool-1.5.4.jar
compress-lzf-1.0.3.jar
-config-1.2.1.jar
core-1.1.2.jar
curator-client-2.4.0.jar
curator-framework-2.4.0.jar
@@ -170,7 +166,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9134e997c8..afae3deb9a 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
RoaringBitmap-0.5.11.jar
ST4-4.0.4.jar
activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
apache-log4j-extras-1.2.17.jar
@@ -45,7 +42,6 @@ commons-math3-3.4.1.jar
commons-net-2.2.jar
commons-pool-1.5.4.jar
compress-lzf-1.0.3.jar
-config-1.2.1.jar
core-1.1.2.jar
curator-client-2.4.0.jar
curator-framework-2.4.0.jar
@@ -171,7 +167,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 8c45832873..5a6460136a 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
RoaringBitmap-0.5.11.jar
ST4-4.0.4.jar
activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
commons-net-2.2.jar
commons-pool-1.5.4.jar
compress-lzf-1.0.3.jar
-config-1.2.1.jar
core-1.1.2.jar
curator-client-2.6.0.jar
curator-framework-2.6.0.jar
@@ -177,7 +173,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
xbean-asm5-shaded-4.4.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 1d34854819..70083e7f3d 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -2,9 +2,6 @@ JavaEWAH-0.3.2.jar
RoaringBitmap-0.5.11.jar
ST4-4.0.4.jar
activation-1.1.1.jar
-akka-actor_2.10-2.3.11.jar
-akka-remote_2.10-2.3.11.jar
-akka-slf4j_2.10-2.3.11.jar
antlr-runtime-3.5.2.jar
aopalliance-1.0.jar
apache-log4j-extras-1.2.17.jar
@@ -49,7 +46,6 @@ commons-math3-3.4.1.jar
commons-net-2.2.jar
commons-pool-1.5.4.jar
compress-lzf-1.0.3.jar
-config-1.2.1.jar
core-1.1.2.jar
curator-client-2.6.0.jar
curator-framework-2.6.0.jar
@@ -178,7 +174,6 @@ stax-api-1.0-2.jar
stax-api-1.0.1.jar
stream-2.7.0.jar
super-csv-2.2.0.jar
-uncommons-maths-1.2.2a.jar
univocity-parsers-1.5.6.jar
unused-1.0.0.jar
xbean-asm5-shaded-4.4.jar
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 2810112f52..814e4406cf 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -35,7 +35,7 @@ There are several useful things to note about this architecture:
processes, and these communicate with each other, it is relatively easy to run it even on a
cluster manager that also supports other applications (e.g. Mesos/YARN).
3. The driver program must listen for and accept incoming connections from its executors throughout
- its lifetime (e.g., see [spark.driver.port and spark.fileserver.port in the network config
+ its lifetime (e.g., see [spark.driver.port in the network config
section](configuration.html#networking)). As such, the driver program must be network
addressable from the worker nodes.
4. Because the driver schedules tasks on the cluster, it should be run close to the worker
diff --git a/docs/configuration.md b/docs/configuration.md
index acaeb83008..d2a2f10524 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -944,52 +944,12 @@ Apart from these, the following properties are also available, and may be useful
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
- <td><code>spark.akka.frameSize</code></td>
+ <td><code>spark.rpc.message.maxSize</code></td>
<td>128</td>
<td>
Maximum message size (in MB) to allow in "control plane" communication; generally only applies to map
output size information sent between executors and the driver. Increase this if you are running
- jobs with many thousands of map and reduce tasks and see messages about the frame size.
- </td>
-</tr>
-<tr>
- <td><code>spark.akka.heartbeat.interval</code></td>
- <td>1000s</td>
- <td>
- This is set to a larger value to disable the transport failure detector that comes built in to
- Akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger
- interval value reduces network overhead and a smaller value ( ~ 1 s) might be more
- informative for Akka's failure detector. Tune this in combination of <code>spark.akka.heartbeat.pauses</code>
- if you need to. A likely positive use case for using failure detector would be: a sensistive
- failure detector can help evict rogue executors quickly. However this is usually not the case
- as GC pauses and network lags are expected in a real Spark cluster. Apart from that enabling
- this leads to a lot of exchanges of heart beats between nodes leading to flooding the network
- with those.
- </td>
-</tr>
-<tr>
- <td><code>spark.akka.heartbeat.pauses</code></td>
- <td>6000s</td>
- <td>
- This is set to a larger value to disable the transport failure detector that comes built in to Akka.
- It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart
- beat pause for Akka. This can be used to control sensitivity to GC pauses. Tune
- this along with <code>spark.akka.heartbeat.interval</code> if you need to.
- </td>
-</tr>
-<tr>
- <td><code>spark.akka.threads</code></td>
- <td>4</td>
- <td>
- Number of actor threads to use for communication. Can be useful to increase on large clusters
- when the driver has a lot of CPU cores.
- </td>
-</tr>
-<tr>
- <td><code>spark.akka.timeout</code></td>
- <td>100s</td>
- <td>
- Communication timeout between Spark nodes.
+ jobs with many thousands of map and reduce tasks and see messages about the RPC message size.
</td>
</tr>
<tr>
@@ -1016,27 +976,11 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.executor.port</code></td>
- <td>(random)</td>
- <td>
- Port for the executor to listen on. This is used for communicating with the driver.
- This is only relevant when using the Akka RPC backend.
- </td>
-</tr>
-<tr>
- <td><code>spark.fileserver.port</code></td>
- <td>(random)</td>
- <td>
- Port for the driver's HTTP file server to listen on.
- This is only relevant when using the Akka RPC backend.
- </td>
-</tr>
-<tr>
<td><code>spark.network.timeout</code></td>
<td>120s</td>
<td>
Default timeout for all network interactions. This config will be used in place of
- <code>spark.core.connection.ack.wait.timeout</code>, <code>spark.akka.timeout</code>,
+ <code>spark.core.connection.ack.wait.timeout</code>,
<code>spark.storage.blockManagerSlaveTimeoutMs</code>,
<code>spark.shuffle.io.connectionTimeout</code>, <code>spark.rpc.askTimeout</code> or
<code>spark.rpc.lookupTimeout</code> if they are not configured.
@@ -1418,8 +1362,7 @@ Apart from these, the following properties are also available, and may be useful
<p>Use <code>spark.ssl.YYY.XXX</code> settings to overwrite the global configuration for
particular protocol denoted by <code>YYY</code>. Currently <code>YYY</code> can be
- either <code>akka</code> for Akka based connections or <code>fs</code> for file
- server.</p>
+ only <code>fs</code> for file server.</p>
</td>
</tr>
<tr>
diff --git a/docs/security.md b/docs/security.md
index a4cc0f42b2..32c33d2857 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -27,8 +27,7 @@ If your applications are using event logging, the directory where the event logs
## Encryption
-Spark supports SSL for Akka and HTTP protocols. SASL encryption is supported for the block transfer
-service.
+Spark supports SSL for HTTP protocols. SASL encryption is supported for the block transfer service.
Encryption is not yet supported for data stored by Spark in temporary local storage, such as shuffle
files, cached data, and other application files. If encrypting this data is desired, a workaround is
@@ -49,10 +48,6 @@ component-specific configuration namespaces used to override the default setting
<th>Component</th>
</tr>
<tr>
- <td><code>spark.ssl.akka</code></td>
- <td>Akka communication channels</td>
- </tr>
- <tr>
<td><code>spark.ssl.fs</code></td>
<td>HTTP file server and broadcast server</td>
</tr>
@@ -137,7 +132,7 @@ configure those ports.
<td>7077</td>
<td>Submit job to cluster /<br> Join cluster</td>
<td><code>SPARK_MASTER_PORT</code></td>
- <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+ <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
</tr>
<tr>
<td>Standalone Master</td>
@@ -145,7 +140,7 @@ configure those ports.
<td>(random)</td>
<td>Schedule executors</td>
<td><code>SPARK_WORKER_PORT</code></td>
- <td>Akka-based. Set to "0" to choose a port randomly. Standalone mode only.</td>
+ <td>Set to "0" to choose a port randomly. Standalone mode only.</td>
</tr>
</table>
@@ -178,24 +173,7 @@ configure those ports.
<td>(random)</td>
<td>Connect to application /<br> Notify executor state changes</td>
<td><code>spark.driver.port</code></td>
- <td>Akka-based. Set to "0" to choose a port randomly.</td>
- </tr>
- <tr>
- <td>Driver</td>
- <td>Executor</td>
- <td>(random)</td>
- <td>Schedule tasks</td>
- <td><code>spark.executor.port</code></td>
- <td>Akka-based. Set to "0" to choose a port randomly. Only used if Akka RPC backend is
- configured.</td>
- </tr>
- <tr>
- <td>Executor</td>
- <td>Driver</td>
- <td>(random)</td>
- <td>File server for files and jars</td>
- <td><code>spark.fileserver.port</code></td>
- <td>Jetty-based. Only used if Akka RPC backend is configured.</td>
+ <td>Set to "0" to choose a port randomly.</td>
</tr>
<tr>
<td>Executor / Driver</td>
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
index 5778fd1d09..ca7385128d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/VectorTransformer.scala
@@ -47,7 +47,7 @@ trait VectorTransformer extends Serializable {
*/
@Since("1.1.0")
def transform(data: RDD[Vector]): RDD[Vector] = {
- // Later in #1498 , all RDD objects are sent via broadcasting instead of akka.
+ // Later in #1498 , all RDD objects are sent via broadcasting instead of RPC.
// So it should be no longer necessary to explicitly broadcast `this` object.
data.map(x => this.transform(x))
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 9b2d023bbf..95d874b843 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -29,7 +29,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
val conf = new SparkConf()
.setMaster("local-cluster[2, 1, 1024]")
.setAppName("test-cluster")
- .set("spark.akka.frameSize", "1") // set to 1MB to detect direct serialization of data
+ .set("spark.rpc.message.maxSize", "1") // set to 1MB to detect direct serialization of data
sc = new SparkContext(conf)
}
diff --git a/pom.xml b/pom.xml
index 43f08efaae..f08642f606 100644
--- a/pom.xml
+++ b/pom.xml
@@ -570,6 +570,11 @@
<version>4.0.29.Final</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.8.0.Final</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>${derby.version}</version>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 905fb4cd90..c65fae482c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -162,7 +162,9 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$4"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream$default$3"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.StreamingContext.actorStream"),
- ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream")
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.api.java.JavaStreamingContext.actorStream"),
+ ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.streaming.zeromq.ZeroMQReceiver"),
+ ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.streaming.receiver.ActorReceiver$Supervisor")
) ++ Seq(
// SPARK-12847 Remove StreamingListenerBus and post all Streaming events to the same thread as Spark events
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.util.AsynchronousListenerBus$"),
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 081f5a1c93..898db85190 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -41,7 +41,6 @@ class ReceivedBlockTrackerSuite
extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {
val hadoopConf = new Configuration()
- val akkaTimeout = 10 seconds
val streamId = 1
var allReceivedBlockTrackers = new ArrayBuffer[ReceivedBlockTracker]()
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 31fa53e24b..21ac04dc76 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -166,7 +166,7 @@ class ExecutorRunnable(
// Certain configs need to be passed here because they are needed before the Executor
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
- // uses Akka to connect to the scheduler, the akka settings are needed as well as the
+ // uses RPC to connect to the scheduler, the RPC settings are needed as well as the
// authentication settings.
sparkConf.getAll
.filter { case (k, v) => SparkConf.isExecutorStartupConf(k) }