aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-22 21:20:04 -0800
committerReynold Xin <rxin@databricks.com>2016-01-22 21:20:04 -0800
commitbc1babd63da4ee56e6d371eb24805a5d714e8295 (patch)
tree8aec6a20e3d23574f53d818752df61a28c64d635 /core
parentd8fefab4d8149f0638282570c75271ef35c65cff (diff)
downloadspark-bc1babd63da4ee56e6d371eb24805a5d714e8295.tar.gz
spark-bc1babd63da4ee56e6d371eb24805a5d714e8295.tar.bz2
spark-bc1babd63da4ee56e6d371eb24805a5d714e8295.zip
[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
- Remove Akka dependency from core. Note: the streaming-akka project still uses Akka. - Remove HttpFileServer - Remove Akka configs from SparkConf and SSLOptions - Rename `spark.akka.frameSize` to `spark.rpc.message.maxSize`. I think it's still worth to keep this config because using `DirectTaskResult` or `IndirectTaskResult` depends on it. - Update comments and docs Author: Shixiong Zhu <shixiong@databricks.com> Closes #10854 from zsxwing/remove-akka.
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml17
-rw-r--r--core/src/main/scala/org/apache/spark/ContextCleaner.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/HttpFileServer.scala91
-rw-r--r--core/src/main/scala/org/apache/spark/MapOutputTracker.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/SecurityManager.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/Client.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala139
-rw-r--r--core/src/main/scala/org/apache/spark/util/RpcUtils.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala264
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala26
29 files changed, 103 insertions, 713 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 2071a58de9..0ab170e028 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -186,19 +186,6 @@
<artifactId>commons-net</artifactId>
</dependency>
<dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-remote_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-slf4j_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
- <artifactId>akka-testkit_${scala.binary.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
@@ -225,6 +212,10 @@
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
</dependency>
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4628093b91..5a42299a0b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
*
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
- * workaround for the issue, which is ultimately caused by the way the BlockManager actors
- * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
+ * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
+ * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
* longer in scope.
*/
@@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
* exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
* parameter by default disables blocking on shuffle cleanups. Note that this does not affect
* the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
- * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+ * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is
* resolved.
*/
private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
deleted file mode 100644
index 46f9f9e9af..0000000000
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-
-import com.google.common.io.Files
-
-import org.apache.spark.util.Utils
-
-private[spark] class HttpFileServer(
- conf: SparkConf,
- securityManager: SecurityManager,
- requestedPort: Int = 0)
- extends Logging {
-
- var baseDir : File = null
- var fileDir : File = null
- var jarDir : File = null
- var httpServer : HttpServer = null
- var serverUri : String = null
-
- def initialize() {
- baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
- fileDir = new File(baseDir, "files")
- jarDir = new File(baseDir, "jars")
- fileDir.mkdir()
- jarDir.mkdir()
- logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server")
- httpServer.start()
- serverUri = httpServer.uri
- logDebug("HTTP file server started at: " + serverUri)
- }
-
- def stop() {
- httpServer.stop()
-
- // If we only stop sc, but the driver process still run as a services then we need to delete
- // the tmp dir, if not, it will create too many tmp dirs
- try {
- Utils.deleteRecursively(baseDir)
- } catch {
- case e: Exception =>
- logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
- }
- }
-
- def addFile(file: File) : String = {
- addFileToDir(file, fileDir)
- serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addJar(file: File) : String = {
- addFileToDir(file, jarDir)
- serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
- def addDirectory(path: String, resourceBase: String): String = {
- httpServer.addDirectory(path, resourceBase)
- serverUri + path
- }
-
- def addFileToDir(file: File, dir: File) : String = {
- // Check whether the file is a directory. If it is, throw a more meaningful exception.
- // If we don't catch this, Guava throws a very confusing error message:
- // java.io.FileNotFoundException: [file] (No such file or directory)
- // even though the directory ([file]) exists.
- if (file.isDirectory) {
- throw new IllegalArgumentException(s"$file cannot be a directory.")
- }
- Files.copy(file, new File(dir, file.getName))
- dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName)
- }
-
-}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 1b59beb8d6..eb2fdecc83 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
private[spark] class MapOutputTrackerMasterEndpoint(
override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf)
extends RpcEndpoint with Logging {
- val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GetMapOutputStatuses(shuffleId: Int) =>
@@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint(
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)
val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId)
val serializedSize = mapOutputStatuses.length
- if (serializedSize > maxAkkaFrameSize) {
+ if (serializedSize > maxRpcMessageSize) {
+
val msg = s"Map output statuses were $serializedSize bytes which " +
- s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)."
+ s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)."
/* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender.
* A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 261265f0b4..d755f07965 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -21,9 +21,6 @@ import java.io.File
import java.security.NoSuchAlgorithmException
import javax.net.ssl.SSLContext
-import scala.collection.JavaConverters._
-
-import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.eclipse.jetty.util.ssl.SslContextFactory
/**
@@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* generate specific objects to configure SSL for different communication protocols.
*
* SSLOptions is intended to provide the maximum common set of SSL settings, which are supported
- * by the protocol, which it can generate the configuration for. Since Akka doesn't support client
- * authentication with SSL, SSLOptions cannot support it either.
+ * by the protocol, which it can generate the configuration for.
*
* @param enabled enables or disables SSL; if it is set to false, the rest of the
* settings are disregarded
@@ -88,43 +84,6 @@ private[spark] case class SSLOptions(
}
}
- /**
- * Creates an Akka configuration object which contains all the SSL settings represented by this
- * object. It can be used then to compose the ultimate Akka configuration.
- */
- def createAkkaConfig: Option[Config] = {
- if (enabled) {
- if (keyStoreType.isDefined) {
- logWarning("Akka configuration does not support key store type.");
- }
- if (trustStoreType.isDefined) {
- logWarning("Akka configuration does not support trust store type.");
- }
-
- Some(ConfigFactory.empty()
- .withValue("akka.remote.netty.tcp.security.key-store",
- ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-store-password",
- ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store",
- ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.trust-store-password",
- ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.key-password",
- ConfigValueFactory.fromAnyRef(keyPassword.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.random-number-generator",
- ConfigValueFactory.fromAnyRef(""))
- .withValue("akka.remote.netty.tcp.security.protocol",
- ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
- .withValue("akka.remote.netty.tcp.security.enabled-algorithms",
- ConfigValueFactory.fromIterable(supportedAlgorithms.asJava))
- .withValue("akka.remote.netty.tcp.enable-ssl",
- ConfigValueFactory.fromAnyRef(true)))
- } else {
- None
- }
- }
-
/*
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c5aec05c03..0675957e16 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -67,17 +67,6 @@ import org.apache.spark.util.Utils
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
- * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
- * Akka remoting allows you to specify a secure cookie that will be exchanged
- * and ensured to be identical in the connection handshake between the client
- * and the server. If they are not identical then the client will be refused
- * to connect to the server. There is no control of the underlying
- * authentication mechanism so its not clear if the password is passed in
- * plaintext or uses DIGEST-MD5 or some other mechanism.
- *
- * Akka also has an option to turn on SSL, this option is currently supported (see
- * the details below).
- *
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
@@ -168,16 +157,16 @@ import org.apache.spark.util.Utils
* denote the global configuration for all the supported protocols. In order to override the global
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
- * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- * Akka based connections or `fs` for broadcast and file server.
+ * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for
+ * broadcast and file server.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
*
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
- * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ * of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for
+ * Jetty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 340e1f7824..36e240e618 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
.map{case (k, v) => (k.substring(prefix.length), v)}
}
- /** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] =
- /* This is currently undocumented. If we want to make this public we should consider
- * nesting options under the spark namespace to avoid conflicts with user akka options.
- * Otherwise users configuring their own akka code via system properties could mess up
- * spark's akka options.
- *
- * E.g. spark.akka.option.x.y.x = "value"
- */
- getAll.filter { case (k, _) => isAkkaConf(k) }
-
/**
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
* from the start in the Executor.
@@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging {
"spark.yarn.max.executor.failures" -> Seq(
AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
"spark.memory.offHeap.enabled" -> Seq(
- AlternateConfig("spark.unsafe.offHeap", "1.6"))
+ AlternateConfig("spark.unsafe.offHeap", "1.6")),
+ "spark.rpc.message.maxSize" -> Seq(
+ AlternateConfig("spark.akka.frameSize", "1.6"))
)
/**
@@ -616,20 +607,12 @@ private[spark] object SparkConf extends Logging {
}
/**
- * Return whether the given config is an akka config (e.g. akka.actor.provider).
- * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
- */
- def isAkkaConf(name: String): Boolean = name.startsWith("akka.")
-
- /**
* Return whether the given config should be passed to an executor on start-up.
*
- * Certain akka and authentication configs are required from the executor when it connects to
+ * Certain authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
- isAkkaConf(name) ||
- name.startsWith("spark.akka") ||
(name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) ||
name.startsWith("spark.ssl") ||
name.startsWith("spark.rpc") ||
@@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging {
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"may be removed in the future. ${cfg.deprecationMessage}")
+ return
}
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
+ return
+ }
+ if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
+ logWarning(
+ s"The configuration key $key is not supported any more " +
+ s"because Spark doesn't use Akka since 2.0")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ec43be0e2f..9461afdc54 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -23,7 +23,6 @@ import java.net.Socket
import scala.collection.mutable
import scala.util.Properties
-import akka.actor.ActorSystem
import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
@@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage._
-import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
+import org.apache.spark.util.{RpcUtils, Utils}
/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a global variable, so all the threads can access the same
* SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext).
*
@@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils}
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
- _actorSystem: ActorSystem, // TODO Remove actorSystem
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
@@ -71,10 +69,6 @@ class SparkEnv (
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf) extends Logging {
- // TODO Remove actorSystem
- @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0")
- val actorSystem: ActorSystem = _actorSystem
-
private[spark] var isStopped = false
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -96,13 +90,8 @@ class SparkEnv (
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
- actorSystem.shutdown()
rpcEnv.shutdown()
-
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
- // actorSystem.awaitTermination()
+ rpcEnv.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
@@ -152,8 +141,8 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _
- private[spark] val driverActorSystemName = "sparkDriver"
- private[spark] val executorActorSystemName = "sparkExecutor"
+ private[spark] val driverSystemName = "sparkDriver"
+ private[spark] val executorSystemName = "sparkExecutor"
def set(e: SparkEnv) {
env = e
@@ -202,7 +191,7 @@ object SparkEnv extends Logging {
/**
* Create a SparkEnv for an executor.
- * In coarse-grained mode, the executor provides an actor system that is already instantiated.
+ * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated.
*/
private[spark] def createExecutorEnv(
conf: SparkConf,
@@ -245,28 +234,11 @@ object SparkEnv extends Logging {
val securityManager = new SecurityManager(conf)
- val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
- // Create the ActorSystem for Akka and get the port it binds to.
- val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
+ val systemName = if (isDriver) driverSystemName else executorSystemName
+ val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager,
clientMode = !isDriver)
- val actorSystem: ActorSystem = {
- val actorSystemPort =
- if (port == 0 || rpcEnv.address == null) {
- port
- } else {
- rpcEnv.address.port + 1
- }
- // Create a ActorSystem for legacy codes
- AkkaUtils.createActorSystem(
- actorSystemName + "ActorSystem",
- hostname,
- actorSystemPort,
- conf,
- securityManager
- )._1
- }
- // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
+ // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
// In the non-driver case, the RPC env's address may be null since it may not be listening
// for incoming connections.
if (isDriver) {
@@ -325,7 +297,7 @@ object SparkEnv extends Logging {
new MapOutputTrackerWorker(conf)
}
- // Have to assign trackerActor after initialization as MapOutputTrackerActor
+ // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
// requires the MapOutputTracker itself
mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(
@@ -398,7 +370,6 @@ object SparkEnv extends Logging {
val envInstance = new SparkEnv(
executorId,
rpcEnv,
- actorSystem,
serializer,
closureSerializer,
cacheManager,
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index 63a20ab41a..dcef03ef3e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -219,11 +219,7 @@ object Client {
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
- if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
- conf.set("spark.akka.logLifecycleEvents", "true")
- }
conf.set("spark.rpc.askTimeout", "10")
- conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a1e8da1ec8..a6749f7e38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -183,7 +183,7 @@ object SparkSubmit {
}
// In standalone cluster mode, there are two submission gateways:
- // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
+ // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
// (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 58bd9ca3d1..e3a6c4c07a 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
- hostPort: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
@@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
- ref.ask[RegisterExecutorResponse](
- RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
+ ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
@@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
- // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
- // connections (e.g., if it's using akka). Otherwise, the executor is running in
- // client mode only, and does not accept incoming connections.
- val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
- hostname + ":" + port
- }.orNull
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
- env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
+ env.rpcEnv, driverUrl, executorId, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 75d7e34d60..030ae41db4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -40,7 +40,7 @@ import org.apache.spark.util._
* Spark executor, backed by a threadpool to run tasks.
*
* This can be used with Mesos, YARN, and the standalone scheduler.
- * An internal RPC interface (at the moment Akka) is used for communication with the driver,
+ * An internal RPC interface is used for communication with the driver,
* except in the case of Mesos fine-grained mode.
*/
private[spark] class Executor(
@@ -97,9 +97,9 @@ private[spark] class Executor(
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)
- // Akka's message frame size. If task result is bigger than this, we use the block manager
+ // Max RPC message size. If task result is bigger than this, we use the block manager
// to send the result back.
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
@@ -263,7 +263,7 @@ private[spark] class Executor(
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
- } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ } else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index f3d0d85476..29e469c3f5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutor(
executorId: String,
executorRef: RpcEndpointRef,
- hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b808993aa6..f69a3d371e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils}
/**
* A scheduler backend that waits for coarse-grained executors to connect.
@@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Total number of executors that are currently registered
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
- private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
@@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
- case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
+ case RegisterExecutor(executorId, executorRef, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
@@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
- if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
+ if (serializedTask.limit >= maxRpcMessageSize) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
- "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
- "spark.akka.frameSize or using broadcast variables for large values."
- msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
- AkkaUtils.reservedSizeBytes)
+ "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+ "spark.rpc.message.maxSize or using broadcast variables for large values."
+ msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0a6f2c01c1..a298cf5ef9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend(
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
logInfo("Writing to HDFS file: " + driverFilePath)
- logInfo("Writing Akka address: " + driverUrl)
+ logInfo("Writing Driver address: " + driverUrl)
logInfo("Writing Spark UI Address: " + appUIAddress)
// Create temporary file to prevent race condition where executors get empty driverUrl file
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
deleted file mode 100644
index 3f4ac9b2f1..0000000000
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.JavaConverters._
-
-import akka.actor.{ActorSystem, ExtendedActorSystem}
-import com.typesafe.config.ConfigFactory
-import org.apache.log4j.{Level, Logger}
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-
-/**
- * Various utility classes for working with Akka.
- */
-private[spark] object AkkaUtils extends Logging {
-
- /**
- * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
- * ActorSystem itself and its port (which is hard to get from Akka).
- *
- * Note: the `name` parameter is important, as even if a client sends a message to right
- * host + port, if the system name is incorrect, Akka will drop the message.
- *
- * If indestructible is set to true, the Actor System will continue running in the event
- * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]].
- */
- def createActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
- val startService: Int => (ActorSystem, Int) = { actualPort =>
- doCreateActorSystem(name, host, actualPort, conf, securityManager)
- }
- Utils.startServiceOnPort(port, startService, conf, name)
- }
-
- private def doCreateActorSystem(
- name: String,
- host: String,
- port: Int,
- conf: SparkConf,
- securityManager: SecurityManager): (ActorSystem, Int) = {
-
- val akkaThreads = conf.getInt("spark.akka.threads", 4)
- val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
- val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout",
- conf.get("spark.network.timeout", "120s"))
- val akkaFrameSize = maxFrameSizeBytes(conf)
- val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
- val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
- if (!akkaLogLifecycleEvents) {
- // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent.
- // See: https://www.assembla.com/spaces/akka/tickets/3787#/
- Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
- }
-
- val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
-
- val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s")
- val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s")
-
- val secretKey = securityManager.getSecretKey()
- val isAuthOn = securityManager.isAuthenticationEnabled()
- if (isAuthOn && secretKey == null) {
- throw new Exception("Secret key is null with authentication on")
- }
- val requireCookie = if (isAuthOn) "on" else "off"
- val secureCookie = if (isAuthOn) secretKey else ""
- logDebug(s"In createActorSystem, requireCookie is: $requireCookie")
-
- val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig
- .getOrElse(ConfigFactory.empty())
-
- val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava)
- .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString(
- s"""
- |akka.daemonic = on
- |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
- |akka.stdout-loglevel = "ERROR"
- |akka.jvm-exit-on-fatal-error = off
- |akka.remote.require-cookie = "$requireCookie"
- |akka.remote.secure-cookie = "$secureCookie"
- |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s
- |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
- |akka.remote.netty.tcp.hostname = "$host"
- |akka.remote.netty.tcp.port = $port
- |akka.remote.netty.tcp.tcp-nodelay = on
- |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s
- |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
- |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
- |akka.actor.default-dispatcher.throughput = $akkaBatchSize
- |akka.log-config-on-start = $logAkkaConfig
- |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
- |akka.log-dead-letters = $lifecycleEvents
- |akka.log-dead-letters-during-shutdown = $lifecycleEvents
- """.stripMargin))
-
- val actorSystem = ActorSystem(name, akkaConf)
- val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.getDefaultAddress.port.get
- (actorSystem, boundPort)
- }
-
- private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
-
- /** Returns the configured max frame size for Akka messages in bytes. */
- def maxFrameSizeBytes(conf: SparkConf): Int = {
- val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
- if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) {
- throw new IllegalArgumentException(
- s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB")
- }
- frameSizeInMB * 1024 * 1024
- }
-
- /** Space reserved for extra data in an Akka message besides serialized task or task result. */
- val reservedSizeBytes = 200 * 1024
-
-}
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index b68936f5c9..2bb8de568e 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -53,4 +53,16 @@ private[spark] object RpcUtils {
def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
+
+ private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
+
+ /** Returns the configured max message size for messages in bytes. */
+ def maxMessageSizeBytes(conf: SparkConf): Int = {
+ val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+ if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
+ throw new IllegalArgumentException(
+ s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB")
+ }
+ maxSizeInMB * 1024 * 1024
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
deleted file mode 100644
index bc7059b77f..0000000000
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.URI
-import java.util.jar.{JarEntry, JarOutputStream}
-import javax.net.ssl.SSLException
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.commons.lang3.RandomUtils
-
-import org.apache.spark.util.Utils
-
-class FileServerSuite extends SparkFunSuite with LocalSparkContext {
-
- import SSLSampleConfigs._
-
- @transient var tmpDir: File = _
- @transient var tmpFile: File = _
- @transient var tmpJarUrl: String = _
-
- def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
-
- override def beforeEach() {
- super.beforeEach()
- resetSparkContext()
- }
-
- override def beforeAll() {
- super.beforeAll()
-
- tmpDir = Utils.createTempDir()
- val testTempDir = new File(tmpDir, "test")
- testTempDir.mkdir()
-
- val textFile = new File(testTempDir, "FileServerSuite.txt")
- val pw = new PrintWriter(textFile)
- // scalastyle:off println
- pw.println("100")
- // scalastyle:on println
- pw.close()
-
- val jarFile = new File(testTempDir, "test.jar")
- val jarStream = new FileOutputStream(jarFile)
- val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-
- val jarEntry = new JarEntry(textFile.getName)
- jar.putNextEntry(jarEntry)
-
- val in = new FileInputStream(textFile)
- ByteStreams.copy(in, jar)
-
- in.close()
- jar.close()
- jarStream.close()
-
- tmpFile = textFile
- tmpJarUrl = jarFile.toURI.toURL.toString
- }
-
- override def afterAll() {
- try {
- Utils.deleteRecursively(tmpDir)
- } finally {
- super.afterAll()
- }
- }
-
- test("Distributing files locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally security On") {
- val sparkConf = new SparkConf(false)
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
- sc = new SparkContext("local[4]", "test", sparkConf)
-
- sc.addFile(tmpFile.toString)
- assert(sc.env.securityManager.isAuthenticationEnabled() === true)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally using URL as input") {
- // addFile("file:///....")
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(new File(tmpFile.toString).toURI.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test("Distributing files on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("Dynamically adding JARS on a standalone cluster using local: URL") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl.replace("file", "local"))
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("HttpFileServer should work with SSL") {
- val sparkConf = sparkSSLConfig()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should work with SSL and good credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with valid SSL and bad credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "bad")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[IOException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with SSL when the server is untrusted") {
- val sparkConf = sparkSSLConfigUntrusted()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[SSLException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
- val randomContent = RandomUtils.nextBytes(100)
- val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
- Files.write(randomContent, file)
- server.addFile(file)
-
- val uri = new URI(server.serverUri + "/files/" + file.getName)
-
- val connection = if (sm != null && sm.isAuthenticationEnabled()) {
- Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
- } else {
- uri.toURL.openConnection()
- }
-
- if (sm != null) {
- Utils.setupSecureURLConnection(connection, sm)
- }
-
- val buf = ByteStreams.toByteArray(connection.getInputStream)
- assert(buf === randomContent)
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e5350840..c7f629a14b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -175,9 +175,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index e1a0bf7c93..24ec99c7e5 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -52,7 +52,7 @@ object LocalSparkContext {
if (sc != null) {
sc.stop()
}
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 3819c0a8f3..6546def596 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
slaveRpcEnv.shutdown()
}
- test("remote fetch below akka frame size") {
+ test("remote fetch below max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~123B, and no exception should be thrown
+ // Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
@@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
rpcEnv.shutdown()
}
- test("remote fetch exceeds akka frame size") {
+ test("remote fetch exceeds max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
+ // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 7603cef773..8bdb237c28 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
val securityManager = new SecurityManager(conf)
- val akkaSSLOptions = securityManager.getSSLOptions("akka")
assert(securityManager.fileServerSSLOptions.enabled === true)
- assert(akkaSSLOptions.enabled === true)
assert(securityManager.sslSocketFactory.isDefined === true)
assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
-
- assert(akkaSSLOptions.trustStore.isDefined === true)
- assert(akkaSSLOptions.trustStore.get.getName === "truststore")
- assert(akkaSSLOptions.keyStore.isDefined === true)
- assert(akkaSSLOptions.keyStore.get.getName === "keystore")
- assert(akkaSSLOptions.trustStorePassword === Some("password"))
- assert(akkaSSLOptions.keyStorePassword === Some("password"))
- assert(akkaSSLOptions.keyPassword === Some("password"))
- assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
- assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
}
test("ssl off setup") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 190e4dd728..9c13c15281 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils {
"publicAddress",
new File("sparkHome"),
new File("workDir"),
- "akka://worker",
+ "spark://worker",
new SparkConf,
Seq("localDir"),
ExecutorState.RUNNING)
@@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils {
new File("sparkHome"),
createDriverDesc(),
null,
- "akka://worker",
+ "spark://worker",
new SecurityManager(conf))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index ab3d4cafeb..fdada0777f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
- val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+ val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index bd8b0655f4..2a1696be36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite {
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
val conf = new SparkConf()
new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
+ driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 64e486d791..6f4eda8b47 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val e = intercept[Exception] {
Await.result(f, 1 seconds)
}
- assert(e.isInstanceOf[TimeoutException] || // For Akka
- e.isInstanceOf[NotSerializableException] // For Netty
- )
+ assert(e.isInstanceOf[NotSerializableException])
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70f40fb26c..04cccc67e3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -18,16 +18,16 @@
package org.apache.spark.scheduler
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
- test("serialized task larger than akka frame size") {
+ test("serialized task larger than max RPC message size") {
val conf = new SparkConf
- conf.set("spark.akka.frameSize", "1")
+ conf.set("spark.rpc.message.maxSize", "1")
conf.set("spark.default.parallelism", "1")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+ val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d89f..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("onTaskGettingResult() called when result fetched remotely") {
- val conf = new SparkConf().set("spark.akka.frameSize", "1")
+ val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
sc = new SparkContext("local", "SparkListenerSuite", conf)
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- assert(akkaFrameSize === 1024 * 1024)
+ // Make a task whose result is larger than the RPC message size
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ assert(maxRpcMessageSize === 1024 * 1024)
val result = sc.parallelize(Seq(1), 1)
- .map { x => 1.to(akkaFrameSize).toArray }
+ .map { x => 1.to(maxRpcMessageSize).toArray }
.reduce { case (x, y) => x }
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
+ // Make a task whose result is larger than the RPC message size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index bc72c3685e..cc2557c2f1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.TestUtils.JavaSourceFromString
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
*/
class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
- def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
+ def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
- test("handling results smaller than Akka frame size") {
+ test("handling results smaller than max RPC message size") {
sc = new SparkContext("local", "test", conf)
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
- test("handling results larger than Akka frame size") {
+ test("handling results larger than max RPC message size") {
sc = new SparkContext("local", "test", conf)
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
- assert(result === 1.to(akkaFrameSize).toArray)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
val RESULT_BLOCK_ID = TaskResultBlockId(0)
assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
@@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
}
val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
scheduler.taskResultGetter = resultGetter
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
assert(resultGetter.removeBlockSuccessfully)
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)