aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache')
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala91
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala139
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala12
16 files changed, 60 insertions, 382 deletions
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4628093b91..5a42299a0b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
- * workaround for the issue, which is ultimately caused by the way the BlockManager actors
- * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+ * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
+ * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
- * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+ * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
deleted file mode 100644
index 46f9f9e9af..0000000000
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import com.google.common.io.Files
-
-import org.apache.spark.util.Utils
-
-private[spark] class HttpFileServer(
- conf: SparkConf,
- securityManager: SecurityManager,
- requestedPort: Int = 0)
- extends Logging {
-
- var baseDir : File = null
- var fileDir : File = null
- var jarDir : File = null
- var httpServer : HttpServer = null
- var serverUri : String = null
-
- def initialize() {
- baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
- fileDir = new File(baseDir, "files")
- jarDir = new File(baseDir, "jars")
- fileDir.mkdir()
- jarDir.mkdir()
- logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
- httpServer.start()
- serverUri = httpServer.uri
- logDebug("HTTP file server started at: " + serverUri)
- }
-
- def stop() {
- httpServer.stop()
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs
- try {
- Utils.deleteRecursively(baseDir)
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
- }
- }
-
- def addFile(file: File) : String = {
- addFileToDir(file, fileDir)
- serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addJar(file: File) : String = {
- addFileToDir(file, jarDir)
- serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addDirectory(path: String, resourceBase: String): String = {
- httpServer.addDirectory(path, resourceBase)
- serverUri + path
- }
-
- def addFileToDir(file: File, dir: File) : String = {
- // Check whether the file is a directory. If it is, throw a more meaningful exception.
- // If we don't catch this, Guava throws a very confusing error message:
- // java.io.FileNotFoundException: [file] (No such file or directory)
- // even though the directory ([file]) exists.
- if (file.isDirectory) {
- throw new IllegalArgumentException(s"$file cannot be a directory.")
- }
- Files.copy(file, new File(dir, file.getName))
- dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b59beb8d6..eb2fdecc83 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
- val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
@@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint(
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
- if (serializedSize > maxAkkaFrameSize) {
+ if (serializedSize > maxRpcMessageSize) {
+
val msg = s"Map output statuses were $serializedSize bytes which " +
- s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
+ s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 261265f0b4..d755f07965 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -21,9 +21,6 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext
-import scala.collection.JavaConverters._
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
/**
@@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
- * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
- * authentication with SSL, SSLOptions cannot support it either.
+ * by the protocol, which it can generate the configuration for.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
@@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
}
}
- /**
- * Creates an Akka configuration object which contains all the SSL settings represented by this
- * object. It can be used then to compose the ultimate Akka configuration.
- */
- def createAkkaConfig: Option[Config] = {
- if (enabled) {
- if (keyStoreType.isDefined) {
- logWarning("Akka configuration does not support key store type.");
- }
- if (trustStoreType.isDefined) {
- logWarning("Akka configuration does not support trust store type.");
- }
-
- Some(ConfigFactory.empty()
- .withValue("akka.remote.netty.tcp.security.key-store",
- ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-store-password",
- ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store",
- ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store-password",
- ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-password",
- ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.random-number-generator",
- ConfigValueFactory.fromAnyRef(""))
- .withValue("akka.remote.netty.tcp.security.protocol",
- ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
- ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
- .withValue("akka.remote.netty.tcp.enable-ssl",
- ConfigValueFactory.fromAnyRef(true)))
- } else {
- None
- }
- }
-
/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c5aec05c03..0675957e16 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
- * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- * Akka remoting allows you to specify a secure cookie that will be exchanged
- * and ensured to be identical in the connection handshake between the client
- * and the server. If they are not identical then the client will be refused
- * to connect to the server. There is no control of the underlying
- * authentication mechanism so its not clear if the password is passed in
- * plaintext or uses DIGEST-MD5 or some other mechanism.
- *
- * Akka also has an option to turn on SSL, this option is currently supported (see
- * the details below).
- *
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
@@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
- * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- * Akka based connections or `fs` for broadcast and file server.
+ * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
+ * broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
- * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ * of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
+ * Jetty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 340e1f7824..36e240e618 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
- /** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] =
- /* This is currently undocumented. If we want to make this public we should consider
- * nesting options under the spark namespace to avoid conflicts with user akka options.
- * Otherwise users configuring their own akka code via system properties could mess up
- * spark's akka options.
- *
- * E.g. spark.akka.option.x.y.x = "value"
- */
- getAll.filter { case (k, _) => isAkkaConf(k) }
-
/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
@@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
- AlternateConfig("spark.unsafe.offHeap", "1.6"))
+ AlternateConfig("spark.unsafe.offHeap", "1.6")),
+ "spark.rpc.message.maxSize" -> Seq(
+ AlternateConfig("spark.akka.frameSize", "1.6"))
)
/**
@@ -616,20 +607,12 @@ private[spark] object SparkConf extends Logging {
}
/**
- * Return whether the given config is an akka config (e.g. akka.actor.provider).
- * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
- */
- def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
-
- /**
* Return whether the given config should be passed to an executor on start-up.
*
- * Certain akka and authentication configs are required from the executor when it connects to
+ * Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
- isAkkaConf(name) ||
- name.startsWith("spark.akka") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
@@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
+ return
}
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
+ return
+ }
+ if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
+ logWarning(
+ s"The configuration key $key is not supported any more " +
+ s"because Spark doesn't use Akka since 2.0")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ec43be0e2f..9461afdc54 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -23,7 +23,6 @@ import java.net.Socket
import scala.collection.mutable
import scala.util.Properties
-import akka.actor.ActorSystem
import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
@@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage._
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
@@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
- _actorSystem: ActorSystem, // TODO Remove actorSystem
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
@@ -71,10 +69,6 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
- // TODO Remove actorSystem
- @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
- val actorSystem: ActorSystem = _actorSystem
-
private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -96,13 +90,8 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
- actorSystem.shutdown()
rpcEnv.shutdown()
-
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- // actorSystem.awaitTermination()
+ rpcEnv.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
@@ -152,8 +141,8 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _
- private[spark] val driverActorSystemName = "sparkDriver"
- private[spark] val executorActorSystemName = "sparkExecutor"
+ private[spark] val driverSystemName = "sparkDriver"
+ private[spark] val executorSystemName = "sparkExecutor"
def set(e: SparkEnv) {
env = e
@@ -202,7 +191,7 @@ object SparkEnv extends Logging {
/**
* Create a SparkEnv for an executor.
- * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+ * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
@@ -245,28 +234,11 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
- val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
- // Create the ActorSystem for Akka and get the port it binds to.
- val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
+ val systemName = if (isDriver) driverSystemName else executorSystemName
+ val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
- val actorSystem: ActorSystem = {
- val actorSystemPort =
- if (port == 0 || rpcEnv.address == null) {
- port
- } else {
- rpcEnv.address.port + 1
- }
- // Create a ActorSystem for legacy codes
- AkkaUtils.createActorSystem(
- actorSystemName + "ActorSystem",
- hostname,
- actorSystemPort,
- conf,
- securityManager
- )._1
- }
- // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+ // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
@@ -325,7 +297,7 @@ object SparkEnv extends Logging {
new MapOutputTrackerWorker(conf)
}
- // Have to assign trackerActor after initialization as MapOutputTrackerActor
+ // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
@@ -398,7 +370,6 @@ object SparkEnv extends Logging {
val envInstance = new SparkEnv(
executorId,
rpcEnv,
- actorSystem,
serializer,
closureSerializer,
cacheManager,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 63a20ab41a..dcef03ef3e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -219,11 +219,7 @@ object Client {
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
- if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
- conf.set("spark.akka.logLifecycleEvents", "true")
- }
conf.set("spark.rpc.askTimeout", "10")
- conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a1e8da1ec8..a6749f7e38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -183,7 +183,7 @@ object SparkSubmit {
}
// In standalone cluster mode, there are two submission gateways:
- // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+ // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 58bd9ca3d1..e3a6c4c07a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
- hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
@@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[RegisterExecutorResponse](
- RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
+ ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
@@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
- // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
- // connections (e.g., if it's using akka). Otherwise, the executor is running in
- // client mode only, and does not accept incoming connections.
- val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
- hostname + ":" + port
- }.orNull
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
- env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
+ env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 75d7e34d60..030ae41db4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util._
* Spark executor, backed by a threadpool to run tasks.
*
* This can be used with Mesos, YARN, and the standalone scheduler.
- * An internal RPC interface (at the moment Akka) is used for communication with the driver,
+ * An internal RPC interface is used for communication with the driver,
* except in the case of Mesos fine-grained mode.
*/
private[spark] class Executor(
@@ -97,9 +97,9 @@ private[spark] class Executor(
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
- // Akka's message frame size. If task result is bigger than this, we use the block manager
+ // Max RPC message size. If task result is bigger than this, we use the block manager
// to send the result back.
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -263,7 +263,7 @@ private[spark] class Executor(
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
- } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ } else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index f3d0d85476..29e469c3f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
- hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b808993aa6..f69a3d371e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
/**
* A scheduler backend that waits for coarse-grained executors to connect.
@@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Total number of executors that are currently registered
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
@@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
+ case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
@@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
- if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
- "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
- "spark.akka.frameSize or using broadcast variables for large values."
- msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
- AkkaUtils.reservedSizeBytes)
+ "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+ "spark.rpc.message.maxSize or using broadcast variables for large values."
+ msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0a6f2c01c1..a298cf5ef9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend(
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
logInfo("Writing to HDFS file: " + driverFilePath)
- logInfo("Writing Akka address: " + driverUrl)
+ logInfo("Writing Driver address: " + driverUrl)
logInfo("Writing Spark UI Address: " + appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b2f1..0000000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
- /**
- * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
- * ActorSystem itself and its port (which is hard to get from Akka).
- *
- * Note: the `name` parameter is important, as even if a client sends a message to right
- * host + port, if the system name is incorrect, Akka will drop the message.
- *
- * If indestructible is set to true, the Actor System will continue running in the event
- * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
- */
- def createActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
- val startService: Int => (ActorSystem, Int) = { actualPort =>
- doCreateActorSystem(name, host, actualPort, conf, securityManager)
- }
- Utils.startServiceOnPort(port, startService, conf, name)
- }
-
- private def doCreateActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
-
- val akkaThreads = conf.getInt("spark.akka.threads", 4)
- val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
- conf.get("spark.network.timeout", "120s"))
- val akkaFrameSize = maxFrameSizeBytes(conf)
- val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
- val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
- if (!akkaLogLifecycleEvents) {
- // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
- // See: https://www.assembla.com/spaces/akka/tickets/3787#/
- Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
- }
-
- val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
- val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
- val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
- val secretKey = securityManager.getSecretKey()
- val isAuthOn = securityManager.isAuthenticationEnabled()
- if (isAuthOn && secretKey == null) {
- throw new Exception("Secret key is null with authentication on")
- }
- val requireCookie = if (isAuthOn) "on" else "off"
- val secureCookie = if (isAuthOn) secretKey else ""
- logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
- val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
- .getOrElse(ConfigFactory.empty())
-
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
- .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
- s"""
- |akka.daemonic = on
- |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
- |akka.stdout-loglevel = "ERROR"
- |akka.jvm-exit-on-fatal-error = off
- |akka.remote.require-cookie = "$requireCookie"
- |akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
- |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
- |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
- |akka.actor.default-dispatcher.throughput = $akkaBatchSize
- |akka.log-config-on-start = $logAkkaConfig
- |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
- |akka.log-dead-letters = $lifecycleEvents
- |akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin))
-
- val actorSystem = ActorSystem(name, akkaConf)
- val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.getDefaultAddress.port.get
- (actorSystem, boundPort)
- }
-
- private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
- /** Returns the configured max frame size for Akka messages in bytes. */
- def maxFrameSizeBytes(conf: SparkConf): Int = {
- val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
- if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
- throw new IllegalArgumentException(
- s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
- }
- frameSizeInMB * 1024 * 1024
- }
-
- /** Space reserved for extra data in an Akka message besides serialized task or task result. */
- val reservedSizeBytes = 200 * 1024
-
-}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index b68936f5c9..2bb8de568e 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -53,4 +53,16 @@ private[spark] object RpcUtils {
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
+
+ private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
+
+ /** Returns the configured max message size for messages in bytes. */
+ def maxMessageSizeBytes(conf: SparkConf): Int = {
+ val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+ if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
+ throw new IllegalArgumentException(
+ s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
+ }
+ maxSizeInMB * 1024 * 1024
+ }
}