From bc1babd63da4ee56e6d371eb24805a5d714e8295 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Fri, 22 Jan 2016 21:20:04 -0800 Subject: [SPARK-7997][CORE] Remove Akka from Spark Core and Streaming - Remove Akka dependency from core. Note: the streaming-akka project still uses Akka. - Remove HttpFileServer - Remove Akka configs from SparkConf and SSLOptions - Rename `spark.akka.frameSize` to `spark.rpc.message.maxSize`. I think it's still worth to keep this config because using `DirectTaskResult` or `IndirectTaskResult` depends on it. - Update comments and docs Author: Shixiong Zhu Closes #10854 from zsxwing/remove-akka. --- core/pom.xml | 17 +- .../scala/org/apache/spark/ContextCleaner.scala | 6 +- .../scala/org/apache/spark/HttpFileServer.scala | 91 ------- .../scala/org/apache/spark/MapOutputTracker.scala | 7 +- .../main/scala/org/apache/spark/SSLOptions.scala | 43 +--- .../scala/org/apache/spark/SecurityManager.scala | 19 +- .../main/scala/org/apache/spark/SparkConf.scala | 32 +-- .../src/main/scala/org/apache/spark/SparkEnv.scala | 49 +--- .../scala/org/apache/spark/deploy/Client.scala | 4 - .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../executor/CoarseGrainedExecutorBackend.scala | 12 +- .../scala/org/apache/spark/executor/Executor.scala | 8 +- .../cluster/CoarseGrainedClusterMessage.scala | 1 - .../cluster/CoarseGrainedSchedulerBackend.scala | 15 +- .../scheduler/cluster/SimrSchedulerBackend.scala | 2 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 139 ----------- .../scala/org/apache/spark/util/RpcUtils.scala | 12 + .../scala/org/apache/spark/FileServerSuite.scala | 264 --------------------- .../org/apache/spark/HeartbeatReceiverSuite.scala | 4 +- .../scala/org/apache/spark/LocalSparkContext.scala | 2 +- .../org/apache/spark/MapOutputTrackerSuite.scala | 12 +- .../org/apache/spark/SecurityManagerSuite.scala | 12 - .../org/apache/spark/deploy/DeployTestUtils.scala | 4 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 2 +- .../spark/deploy/worker/DriverRunnerTest.scala | 2 +- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 4 +- .../CoarseGrainedSchedulerBackendSuite.scala | 8 +- .../spark/scheduler/SparkListenerSuite.scala | 17 +- .../spark/scheduler/TaskResultGetterSuite.scala | 26 +- 29 files changed, 103 insertions(+), 713 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/HttpFileServer.scala delete mode 100644 core/src/main/scala/org/apache/spark/util/AkkaUtils.scala delete mode 100644 core/src/test/scala/org/apache/spark/FileServerSuite.scala (limited to 'core') diff --git a/core/pom.xml b/core/pom.xml index 2071a58de9..0ab170e028 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -185,19 +185,6 @@ commons-net commons-net - - ${akka.group} - akka-remote_${scala.binary.version} - - - ${akka.group} - akka-slf4j_${scala.binary.version} - - - ${akka.group} - akka-testkit_${scala.binary.version} - test - org.scala-lang scala-library @@ -224,6 +211,10 @@ io.netty netty-all + + io.netty + netty + com.clearspring.analytics stream diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 4628093b91..5a42299a0b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -86,8 +86,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter). * * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary - * workaround for the issue, which is ultimately caused by the way the BlockManager actors - * issue inter-dependent blocking Akka messages to each other at high frequencies. This happens, + * workaround for the issue, which is ultimately caused by the way the BlockManager endpoints + * issue inter-dependent blocking RPC messages to each other at high frequencies. This happens, * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no * longer in scope. */ @@ -101,7 +101,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this * parameter by default disables blocking on shuffle cleanups. Note that this does not affect * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround, - * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is + * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is * resolved. */ private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala deleted file mode 100644 index 46f9f9e9af..0000000000 --- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io.File - -import com.google.common.io.Files - -import org.apache.spark.util.Utils - -private[spark] class HttpFileServer( - conf: SparkConf, - securityManager: SecurityManager, - requestedPort: Int = 0) - extends Logging { - - var baseDir : File = null - var fileDir : File = null - var jarDir : File = null - var httpServer : HttpServer = null - var serverUri : String = null - - def initialize() { - baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd") - fileDir = new File(baseDir, "files") - jarDir = new File(baseDir, "jars") - fileDir.mkdir() - jarDir.mkdir() - logInfo("HTTP File server directory is " + baseDir) - httpServer = new HttpServer(conf, baseDir, securityManager, requestedPort, "HTTP file server") - httpServer.start() - serverUri = httpServer.uri - logDebug("HTTP file server started at: " + serverUri) - } - - def stop() { - httpServer.stop() - - // If we only stop sc, but the driver process still run as a services then we need to delete - // the tmp dir, if not, it will create too many tmp dirs - try { - Utils.deleteRecursively(baseDir) - } catch { - case e: Exception => - logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e) - } - } - - def addFile(file: File) : String = { - addFileToDir(file, fileDir) - serverUri + "/files/" + Utils.encodeFileNameToURIRawPath(file.getName) - } - - def addJar(file: File) : String = { - addFileToDir(file, jarDir) - serverUri + "/jars/" + Utils.encodeFileNameToURIRawPath(file.getName) - } - - def addDirectory(path: String, resourceBase: String): String = { - httpServer.addDirectory(path, resourceBase) - serverUri + path - } - - def addFileToDir(file: File, dir: File) : String = { - // Check whether the file is a directory. If it is, throw a more meaningful exception. - // If we don't catch this, Guava throws a very confusing error message: - // java.io.FileNotFoundException: [file] (No such file or directory) - // even though the directory ([file]) exists. - if (file.isDirectory) { - throw new IllegalArgumentException(s"$file cannot be a directory.") - } - Files.copy(file, new File(dir, file.getName)) - dir + "/" + Utils.encodeFileNameToURIRawPath(file.getName) - } - -} diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1b59beb8d6..eb2fdecc83 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -40,7 +40,7 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage private[spark] class MapOutputTrackerMasterEndpoint( override val rpcEnv: RpcEnv, tracker: MapOutputTrackerMaster, conf: SparkConf) extends RpcEndpoint with Logging { - val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case GetMapOutputStatuses(shuffleId: Int) => @@ -48,9 +48,10 @@ private[spark] class MapOutputTrackerMasterEndpoint( logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort) val mapOutputStatuses = tracker.getSerializedMapOutputStatuses(shuffleId) val serializedSize = mapOutputStatuses.length - if (serializedSize > maxAkkaFrameSize) { + if (serializedSize > maxRpcMessageSize) { + val msg = s"Map output statuses were $serializedSize bytes which " + - s"exceeds spark.akka.frameSize ($maxAkkaFrameSize bytes)." + s"exceeds spark.rpc.message.maxSize ($maxRpcMessageSize bytes)." /* For SPARK-1244 we'll opt for just logging an error and then sending it to the sender. * A bigger refactoring (SPARK-1239) will ultimately remove this entire code path. */ diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 261265f0b4..d755f07965 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -21,9 +21,6 @@ import java.io.File import java.security.NoSuchAlgorithmException import javax.net.ssl.SSLContext -import scala.collection.JavaConverters._ - -import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.eclipse.jetty.util.ssl.SslContextFactory /** @@ -31,8 +28,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory * generate specific objects to configure SSL for different communication protocols. * * SSLOptions is intended to provide the maximum common set of SSL settings, which are supported - * by the protocol, which it can generate the configuration for. Since Akka doesn't support client - * authentication with SSL, SSLOptions cannot support it either. + * by the protocol, which it can generate the configuration for. * * @param enabled enables or disables SSL; if it is set to false, the rest of the * settings are disregarded @@ -88,43 +84,6 @@ private[spark] case class SSLOptions( } } - /** - * Creates an Akka configuration object which contains all the SSL settings represented by this - * object. It can be used then to compose the ultimate Akka configuration. - */ - def createAkkaConfig: Option[Config] = { - if (enabled) { - if (keyStoreType.isDefined) { - logWarning("Akka configuration does not support key store type."); - } - if (trustStoreType.isDefined) { - logWarning("Akka configuration does not support trust store type."); - } - - Some(ConfigFactory.empty() - .withValue("akka.remote.netty.tcp.security.key-store", - ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.key-store-password", - ConfigValueFactory.fromAnyRef(keyStorePassword.getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.trust-store", - ConfigValueFactory.fromAnyRef(trustStore.map(_.getAbsolutePath).getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.trust-store-password", - ConfigValueFactory.fromAnyRef(trustStorePassword.getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.key-password", - ConfigValueFactory.fromAnyRef(keyPassword.getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.random-number-generator", - ConfigValueFactory.fromAnyRef("")) - .withValue("akka.remote.netty.tcp.security.protocol", - ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) - .withValue("akka.remote.netty.tcp.security.enabled-algorithms", - ConfigValueFactory.fromIterable(supportedAlgorithms.asJava)) - .withValue("akka.remote.netty.tcp.enable-ssl", - ConfigValueFactory.fromAnyRef(true))) - } else { - None - } - } - /* * The supportedAlgorithms set is a subset of the enabledAlgorithms that * are supported by the current Java security provider for this protocol. diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index c5aec05c03..0675957e16 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -67,17 +67,6 @@ import org.apache.spark.util.Utils * At this point spark has multiple communication protocols that need to be secured and * different underlying mechanisms are used depending on the protocol: * - * - Akka -> The only option here is to use the Akka Remote secure-cookie functionality. - * Akka remoting allows you to specify a secure cookie that will be exchanged - * and ensured to be identical in the connection handshake between the client - * and the server. If they are not identical then the client will be refused - * to connect to the server. There is no control of the underlying - * authentication mechanism so its not clear if the password is passed in - * plaintext or uses DIGEST-MD5 or some other mechanism. - * - * Akka also has an option to turn on SSL, this option is currently supported (see - * the details below). - * * - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty * for the HttpServer. Jetty supports multiple authentication mechanisms - * Basic, Digest, Form, Spengo, etc. It also supports multiple different login @@ -168,16 +157,16 @@ import org.apache.spark.util.Utils * denote the global configuration for all the supported protocols. In order to override the global * configuration for the particular protocol, the properties must be overwritten in the * protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global - * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for - * Akka based connections or `fs` for broadcast and file server. + * configuration for particular protocol denoted by `yyy`. Currently `yyy` can be only`fs` for + * broadcast and file server. * * Refer to [[org.apache.spark.SSLOptions]] documentation for the list of * options that can be specified. * * SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions * object parses Spark configuration at a given namespace and builds the common representation - * of SSL settings. SSLOptions is then used to provide protocol-specific configuration like - * TypeSafe configuration for Akka or SSLContextFactory for Jetty. + * of SSL settings. SSLOptions is then used to provide protocol-specific SSLContextFactory for + * Jetty. * * SSL must be configured on each node and configured for each component involved in * communication using the particular protocol. In YARN clusters, the key-store can be prepared on diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 340e1f7824..36e240e618 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -344,17 +344,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { .map{case (k, v) => (k.substring(prefix.length), v)} } - /** Get all akka conf variables set on this SparkConf */ - def getAkkaConf: Seq[(String, String)] = - /* This is currently undocumented. If we want to make this public we should consider - * nesting options under the spark namespace to avoid conflicts with user akka options. - * Otherwise users configuring their own akka code via system properties could mess up - * spark's akka options. - * - * E.g. spark.akka.option.x.y.x = "value" - */ - getAll.filter { case (k, _) => isAkkaConf(k) } - /** * Returns the Spark application id, valid in the Driver after TaskScheduler registration and * from the start in the Executor. @@ -600,7 +589,9 @@ private[spark] object SparkConf extends Logging { "spark.yarn.max.executor.failures" -> Seq( AlternateConfig("spark.yarn.max.worker.failures", "1.5")), "spark.memory.offHeap.enabled" -> Seq( - AlternateConfig("spark.unsafe.offHeap", "1.6")) + AlternateConfig("spark.unsafe.offHeap", "1.6")), + "spark.rpc.message.maxSize" -> Seq( + AlternateConfig("spark.akka.frameSize", "1.6")) ) /** @@ -615,21 +606,13 @@ private[spark] object SparkConf extends Logging { }.toMap } - /** - * Return whether the given config is an akka config (e.g. akka.actor.provider). - * Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout). - */ - def isAkkaConf(name: String): Boolean = name.startsWith("akka.") - /** * Return whether the given config should be passed to an executor on start-up. * - * Certain akka and authentication configs are required from the executor when it connects to + * Certain authentication configs are required from the executor when it connects to * the scheduler, while the rest of the spark configs can be inherited from the driver later. */ def isExecutorStartupConf(name: String): Boolean = { - isAkkaConf(name) || - name.startsWith("spark.akka") || (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) || name.startsWith("spark.ssl") || name.startsWith("spark.rpc") || @@ -664,12 +647,19 @@ private[spark] object SparkConf extends Logging { logWarning( s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " + s"may be removed in the future. ${cfg.deprecationMessage}") + return } allAlternatives.get(key).foreach { case (newKey, cfg) => logWarning( s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " + s"and may be removed in the future. Please use the new key '$newKey' instead.") + return + } + if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) { + logWarning( + s"The configuration key $key is not supported any more " + + s"because Spark doesn't use Akka since 2.0") } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index ec43be0e2f..9461afdc54 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -23,7 +23,6 @@ import java.net.Socket import scala.collection.mutable import scala.util.Properties -import akka.actor.ActorSystem import com.google.common.collect.MapMaker import org.apache.spark.annotation.DeveloperApi @@ -39,12 +38,12 @@ import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinato import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage._ -import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils} +import org.apache.spark.util.{RpcUtils, Utils} /** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), - * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently + * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). * @@ -55,7 +54,6 @@ import org.apache.spark.util.{AkkaUtils, RpcUtils, Utils} class SparkEnv ( val executorId: String, private[spark] val rpcEnv: RpcEnv, - _actorSystem: ActorSystem, // TODO Remove actorSystem val serializer: Serializer, val closureSerializer: Serializer, val cacheManager: CacheManager, @@ -71,10 +69,6 @@ class SparkEnv ( val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { - // TODO Remove actorSystem - @deprecated("Actor system is no longer supported as of 1.4.0", "1.4.0") - val actorSystem: ActorSystem = _actorSystem - private[spark] var isStopped = false private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() @@ -96,13 +90,8 @@ class SparkEnv ( blockManager.master.stop() metricsSystem.stop() outputCommitCoordinator.stop() - actorSystem.shutdown() rpcEnv.shutdown() - - // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut - // down, but let's call it anyway in case it gets fixed in a later release - // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. - // actorSystem.awaitTermination() + rpcEnv.awaitTermination() // Note that blockTransferService is stopped by BlockManager since it is started by it. @@ -152,8 +141,8 @@ class SparkEnv ( object SparkEnv extends Logging { @volatile private var env: SparkEnv = _ - private[spark] val driverActorSystemName = "sparkDriver" - private[spark] val executorActorSystemName = "sparkExecutor" + private[spark] val driverSystemName = "sparkDriver" + private[spark] val executorSystemName = "sparkExecutor" def set(e: SparkEnv) { env = e @@ -202,7 +191,7 @@ object SparkEnv extends Logging { /** * Create a SparkEnv for an executor. - * In coarse-grained mode, the executor provides an actor system that is already instantiated. + * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated. */ private[spark] def createExecutorEnv( conf: SparkConf, @@ -245,28 +234,11 @@ object SparkEnv extends Logging { val securityManager = new SecurityManager(conf) - val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName - // Create the ActorSystem for Akka and get the port it binds to. - val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager, + val systemName = if (isDriver) driverSystemName else executorSystemName + val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager, clientMode = !isDriver) - val actorSystem: ActorSystem = { - val actorSystemPort = - if (port == 0 || rpcEnv.address == null) { - port - } else { - rpcEnv.address.port + 1 - } - // Create a ActorSystem for legacy codes - AkkaUtils.createActorSystem( - actorSystemName + "ActorSystem", - hostname, - actorSystemPort, - conf, - securityManager - )._1 - } - // Figure out which port Akka actually bound to in case the original port is 0 or occupied. + // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. // In the non-driver case, the RPC env's address may be null since it may not be listening // for incoming connections. if (isDriver) { @@ -325,7 +297,7 @@ object SparkEnv extends Logging { new MapOutputTrackerWorker(conf) } - // Have to assign trackerActor after initialization as MapOutputTrackerActor + // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( @@ -398,7 +370,6 @@ object SparkEnv extends Logging { val envInstance = new SparkEnv( executorId, rpcEnv, - actorSystem, serializer, closureSerializer, cacheManager, diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 63a20ab41a..dcef03ef3e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -219,11 +219,7 @@ object Client { val conf = new SparkConf() val driverArgs = new ClientArguments(args) - if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { - conf.set("spark.akka.logLifecycleEvents", "true") - } conf.set("spark.rpc.askTimeout", "10") - conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) Logger.getRootLogger.setLevel(driverArgs.logLevel) val rpcEnv = diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a1e8da1ec8..a6749f7e38 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -183,7 +183,7 @@ object SparkSubmit { } // In standalone cluster mode, there are two submission gateways: - // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper + // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 58bd9ca3d1..e3a6c4c07a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -37,7 +37,6 @@ private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, - hostPort: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) @@ -55,8 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisterExecutorResponse]( - RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) + ref.ask[RegisterExecutorResponse](RegisterExecutor(executorId, self, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { @@ -184,14 +182,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, port, cores, isLocal = false) - // SparkEnv will set spark.executor.port if the rpc env is listening for incoming - // connections (e.g., if it's using akka). Otherwise, the executor is running in - // client mode only, and does not accept incoming connections. - val sparkHostPort = env.conf.getOption("spark.executor.port").map { port => - hostname + ":" + port - }.orNull env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( - env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env)) + env.rpcEnv, driverUrl, executorId, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 75d7e34d60..030ae41db4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -40,7 +40,7 @@ import org.apache.spark.util._ * Spark executor, backed by a threadpool to run tasks. * * This can be used with Mesos, YARN, and the standalone scheduler. - * An internal RPC interface (at the moment Akka) is used for communication with the driver, + * An internal RPC interface is used for communication with the driver, * except in the case of Mesos fine-grained mode. */ private[spark] class Executor( @@ -97,9 +97,9 @@ private[spark] class Executor( // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager + // Max RPC message size. If task result is bigger than this, we use the block manager // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // Limit of bytes for total size of results (default is 1GB) private val maxResultSize = Utils.getMaxResultSize(conf) @@ -263,7 +263,7 @@ private[spark] class Executor( s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " + s"dropping it.") ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize)) - } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { + } else if (resultSize >= maxRpcMessageSize) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes( blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index f3d0d85476..29e469c3f5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -48,7 +48,6 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutor( executorId: String, executorRef: RpcEndpointRef, - hostPort: String, cores: Int, logUrls: Map[String, String]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b808993aa6..f69a3d371e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -27,7 +27,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME -import org.apache.spark.util.{AkkaUtils, SerializableBuffer, ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} /** * A scheduler backend that waits for coarse-grained executors to connect. @@ -46,7 +46,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Total number of executors that are currently registered var totalRegisteredExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. var minRegisteredRatio = @@ -134,7 +134,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => + case RegisterExecutor(executorId, executorRef, cores, logUrls) => if (executorDataMap.contains(executorId)) { context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) } else { @@ -224,14 +224,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) - if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { + if (serializedTask.limit >= maxRpcMessageSize) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + - "spark.akka.frameSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, - AkkaUtils.reservedSizeBytes) + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + + "spark.rpc.message.maxSize or using broadcast variables for large values." + msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 0a6f2c01c1..a298cf5ef9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -49,7 +49,7 @@ private[spark] class SimrSchedulerBackend( val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") logInfo("Writing to HDFS file: " + driverFilePath) - logInfo("Writing Akka address: " + driverUrl) + logInfo("Writing Driver address: " + driverUrl) logInfo("Writing Spark UI Address: " + appUIAddress) // Create temporary file to prevent race condition where executors get empty driverUrl file diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala deleted file mode 100644 index 3f4ac9b2f1..0000000000 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import scala.collection.JavaConverters._ - -import akka.actor.{ActorSystem, ExtendedActorSystem} -import com.typesafe.config.ConfigFactory -import org.apache.log4j.{Level, Logger} - -import org.apache.spark.{Logging, SecurityManager, SparkConf} - -/** - * Various utility classes for working with Akka. - */ -private[spark] object AkkaUtils extends Logging { - - /** - * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the - * ActorSystem itself and its port (which is hard to get from Akka). - * - * Note: the `name` parameter is important, as even if a client sends a message to right - * host + port, if the system name is incorrect, Akka will drop the message. - * - * If indestructible is set to true, the Actor System will continue running in the event - * of a fatal exception. This is used by [[org.apache.spark.executor.Executor]]. - */ - def createActorSystem( - name: String, - host: String, - port: Int, - conf: SparkConf, - securityManager: SecurityManager): (ActorSystem, Int) = { - val startService: Int => (ActorSystem, Int) = { actualPort => - doCreateActorSystem(name, host, actualPort, conf, securityManager) - } - Utils.startServiceOnPort(port, startService, conf, name) - } - - private def doCreateActorSystem( - name: String, - host: String, - port: Int, - conf: SparkConf, - securityManager: SecurityManager): (ActorSystem, Int) = { - - val akkaThreads = conf.getInt("spark.akka.threads", 4) - val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) - val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout", - conf.get("spark.network.timeout", "120s")) - val akkaFrameSize = maxFrameSizeBytes(conf) - val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) - val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" - if (!akkaLogLifecycleEvents) { - // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. - // See: https://www.assembla.com/spaces/akka/tickets/3787#/ - Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) - } - - val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - - val akkaHeartBeatPausesS = conf.getTimeAsSeconds("spark.akka.heartbeat.pauses", "6000s") - val akkaHeartBeatIntervalS = conf.getTimeAsSeconds("spark.akka.heartbeat.interval", "1000s") - - val secretKey = securityManager.getSecretKey() - val isAuthOn = securityManager.isAuthenticationEnabled() - if (isAuthOn && secretKey == null) { - throw new Exception("Secret key is null with authentication on") - } - val requireCookie = if (isAuthOn) "on" else "off" - val secureCookie = if (isAuthOn) secretKey else "" - logDebug(s"In createActorSystem, requireCookie is: $requireCookie") - - val akkaSslConfig = securityManager.getSSLOptions("akka").createAkkaConfig - .getOrElse(ConfigFactory.empty()) - - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava) - .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( - s""" - |akka.daemonic = on - |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] - |akka.stdout-loglevel = "ERROR" - |akka.jvm-exit-on-fatal-error = off - |akka.remote.require-cookie = "$requireCookie" - |akka.remote.secure-cookie = "$secureCookie" - |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatIntervalS s - |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPausesS s - |akka.actor.provider = "akka.remote.RemoteActorRefProvider" - |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" - |akka.remote.netty.tcp.hostname = "$host" - |akka.remote.netty.tcp.port = $port - |akka.remote.netty.tcp.tcp-nodelay = on - |akka.remote.netty.tcp.connection-timeout = $akkaTimeoutS s - |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B - |akka.remote.netty.tcp.execution-pool-size = $akkaThreads - |akka.actor.default-dispatcher.throughput = $akkaBatchSize - |akka.log-config-on-start = $logAkkaConfig - |akka.remote.log-remote-lifecycle-events = $lifecycleEvents - |akka.log-dead-letters = $lifecycleEvents - |akka.log-dead-letters-during-shutdown = $lifecycleEvents - """.stripMargin)) - - val actorSystem = ActorSystem(name, akkaConf) - val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider - val boundPort = provider.getDefaultAddress.port.get - (actorSystem, boundPort) - } - - private val AKKA_MAX_FRAME_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 - - /** Returns the configured max frame size for Akka messages in bytes. */ - def maxFrameSizeBytes(conf: SparkConf): Int = { - val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128) - if (frameSizeInMB > AKKA_MAX_FRAME_SIZE_IN_MB) { - throw new IllegalArgumentException( - s"spark.akka.frameSize should not be greater than $AKKA_MAX_FRAME_SIZE_IN_MB MB") - } - frameSizeInMB * 1024 * 1024 - } - - /** Space reserved for extra data in an Akka message besides serialized task or task result. */ - val reservedSizeBytes = 200 * 1024 - -} diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index b68936f5c9..2bb8de568e 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -53,4 +53,16 @@ private[spark] object RpcUtils { def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s") } + + private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 + + /** Returns the configured max message size for messages in bytes. */ + def maxMessageSizeBytes(conf: SparkConf): Int = { + val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128) + if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) { + throw new IllegalArgumentException( + s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB") + } + maxSizeInMB * 1024 * 1024 + } } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala deleted file mode 100644 index bc7059b77f..0000000000 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io._ -import java.net.URI -import java.util.jar.{JarEntry, JarOutputStream} -import javax.net.ssl.SSLException - -import com.google.common.io.{ByteStreams, Files} -import org.apache.commons.lang3.RandomUtils - -import org.apache.spark.util.Utils - -class FileServerSuite extends SparkFunSuite with LocalSparkContext { - - import SSLSampleConfigs._ - - @transient var tmpDir: File = _ - @transient var tmpFile: File = _ - @transient var tmpJarUrl: String = _ - - def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false") - - override def beforeEach() { - super.beforeEach() - resetSparkContext() - } - - override def beforeAll() { - super.beforeAll() - - tmpDir = Utils.createTempDir() - val testTempDir = new File(tmpDir, "test") - testTempDir.mkdir() - - val textFile = new File(testTempDir, "FileServerSuite.txt") - val pw = new PrintWriter(textFile) - // scalastyle:off println - pw.println("100") - // scalastyle:on println - pw.close() - - val jarFile = new File(testTempDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - ByteStreams.copy(in, jar) - - in.close() - jar.close() - jarStream.close() - - tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString - } - - override def afterAll() { - try { - Utils.deleteRecursively(tmpDir) - } finally { - super.afterAll() - } - } - - test("Distributing files locally") { - sc = new SparkContext("local[4]", "test", newConf) - sc.addFile(tmpFile.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test("Distributing files locally security On") { - val sparkConf = new SparkConf(false) - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "good") - sc = new SparkContext("local[4]", "test", sparkConf) - - sc.addFile(tmpFile.toString) - assert(sc.env.securityManager.isAuthenticationEnabled() === true) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test("Distributing files locally using URL as input") { - // addFile("file:///....") - sc = new SparkContext("local[4]", "test", newConf) - sc.addFile(new File(tmpFile.toString).toURI.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test ("Dynamically adding JARS locally") { - sc = new SparkContext("local[4]", "test", newConf) - sc.addJar(tmpJarUrl) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test("Distributing files on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addFile(tmpFile.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test ("Dynamically adding JARS on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addJar(tmpJarUrl) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test ("Dynamically adding JARS on a standalone cluster using local: URL") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addJar(tmpJarUrl.replace("file", "local")) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test ("HttpFileServer should work with SSL") { - val sparkConf = sparkSSLConfig() - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - fileTransferTest(server, sm) - } finally { - server.stop() - } - } - - test ("HttpFileServer should work with SSL and good credentials") { - val sparkConf = sparkSSLConfig() - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "good") - - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - fileTransferTest(server, sm) - } finally { - server.stop() - } - } - - test ("HttpFileServer should not work with valid SSL and bad credentials") { - val sparkConf = sparkSSLConfig() - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "bad") - - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - intercept[IOException] { - fileTransferTest(server) - } - } finally { - server.stop() - } - } - - test ("HttpFileServer should not work with SSL when the server is untrusted") { - val sparkConf = sparkSSLConfigUntrusted() - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - intercept[SSLException] { - fileTransferTest(server) - } - } finally { - server.stop() - } - } - - def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = { - val randomContent = RandomUtils.nextBytes(100) - val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir) - Files.write(randomContent, file) - server.addFile(file) - - val uri = new URI(server.serverUri + "/files/" + file.getName) - - val connection = if (sm != null && sm.isAuthenticationEnabled()) { - Utils.constructURIForAuthentication(uri, sm).toURL.openConnection() - } else { - uri.toURL.openConnection() - } - - if (sm != null) { - Utils.setupSecureURLConnection(connection, sm) - } - - val buf = ByteStreams.toByteArray(connection.getInputStream) - assert(buf === randomContent) - } - -} diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 18e5350840..c7f629a14b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -175,9 +175,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty)) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index e1a0bf7c93..24ec99c7e5 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -52,7 +52,7 @@ object LocalSparkContext { if (sc != null) { sc.stop() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 3819c0a8f3..6546def596 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { slaveRpcEnv.shutdown() } - test("remote fetch below akka frame size") { + test("remote fetch below max RPC message size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - // Frame size should be ~123B, and no exception should be thrown + // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, MapStatus( BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) @@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } - test("remote fetch exceeds akka frame size") { + test("remote fetch exceeds max RPC message size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception. + // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception. // Note that the size is hand-selected here because map output statuses are compressed before // being sent. masterTracker.registerShuffle(20, 100) diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index 7603cef773..8bdb237c28 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256") val securityManager = new SecurityManager(conf) - val akkaSSLOptions = securityManager.getSSLOptions("akka") assert(securityManager.fileServerSSLOptions.enabled === true) - assert(akkaSSLOptions.enabled === true) assert(securityManager.sslSocketFactory.isDefined === true) assert(securityManager.hostnameVerifier.isDefined === true) @@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(securityManager.fileServerSSLOptions.keyPassword === Some("password")) assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2")) assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms) - - assert(akkaSSLOptions.trustStore.isDefined === true) - assert(akkaSSLOptions.trustStore.get.getName === "truststore") - assert(akkaSSLOptions.keyStore.isDefined === true) - assert(akkaSSLOptions.keyStore.get.getName === "keystore") - assert(akkaSSLOptions.trustStorePassword === Some("password")) - assert(akkaSSLOptions.keyStorePassword === Some("password")) - assert(akkaSSLOptions.keyPassword === Some("password")) - assert(akkaSSLOptions.protocol === Some("TLSv1.2")) - assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) } test("ssl off setup") { diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 190e4dd728..9c13c15281 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils { "publicAddress", new File("sparkHome"), new File("workDir"), - "akka://worker", + "spark://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) @@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils { new File("sparkHome"), createDriverDesc(), null, - "akka://worker", + "spark://worker", new SecurityManager(conf)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index ab3d4cafeb..fdada0777f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty) + val message = RegisterExecutor(id, endpointRef, 10, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message) } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index bd8b0655f4..2a1696be36 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite { val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) val conf = new SparkConf() new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf)) + driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf)) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 64e486d791..6f4eda8b47 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val e = intercept[Exception] { Await.result(f, 1 seconds) } - assert(e.isInstanceOf[TimeoutException] || // For Akka - e.isInstanceOf[NotSerializableException] // For Netty - ) + assert(e.isInstanceOf[NotSerializableException]) } finally { anotherEnv.shutdown() anotherEnv.awaitTermination() diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 70f40fb26c..04cccc67e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -18,16 +18,16 @@ package org.apache.spark.scheduler import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} -import org.apache.spark.util.{AkkaUtils, SerializableBuffer} +import org.apache.spark.util.{RpcUtils, SerializableBuffer} class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { - test("serialized task larger than akka frame size") { + test("serialized task larger than max RPC message size") { val conf = new SparkConf - conf.set("spark.akka.frameSize", "1") + conf.set("spark.rpc.message.maxSize", "1") conf.set("spark.default.parallelism", "1") sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) - val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) + val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf) val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) val larger = sc.parallelize(Seq(buffer)) val thrown = intercept[SparkException] { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index c87158d89f..58d217ffef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.ResetSystemProperties +import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers with ResetSystemProperties { @@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("onTaskGettingResult() called when result fetched remotely") { - val conf = new SparkConf().set("spark.akka.frameSize", "1") + val conf = new SparkConf().set("spark.rpc.message.maxSize", "1") sc = new SparkContext("local", "SparkListenerSuite", conf) val listener = new SaveTaskEvents sc.addSparkListener(listener) - // Make a task whose result is larger than the akka frame size - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - assert(akkaFrameSize === 1024 * 1024) + // Make a task whose result is larger than the RPC message size + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + assert(maxRpcMessageSize === 1024 * 1024) val result = sc.parallelize(Seq(1), 1) - .map { x => 1.to(akkaFrameSize).toArray } + .map { x => 1.to(maxRpcMessageSize).toArray } .reduce { case (x, y) => x } - assert(result === 1.to(akkaFrameSize).toArray) + assert(result === 1.to(maxRpcMessageSize).toArray) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val TASK_INDEX = 0 @@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val listener = new SaveTaskEvents sc.addSparkListener(listener) - // Make a task whose result is larger than the akka frame size + // Make a task whose result is larger than the RPC message size val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index bc72c3685e..cc2557c2f1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.storage.TaskResultBlockId import org.apache.spark.TestUtils.JavaSourceFromString -import org.apache.spark.util.{MutableURLClassLoader, Utils} +import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils} /** * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter. @@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule */ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { - // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small + // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small // as we can make it) so the tests don't take too long. - def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1") + def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1") - test("handling results smaller than Akka frame size") { + test("handling results smaller than max RPC message size") { sc = new SparkContext("local", "test", conf) val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) assert(result === 2) } - test("handling results larger than Akka frame size") { + test("handling results larger than max RPC message size") { sc = new SparkContext("local", "test", conf) - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) - assert(result === 1.to(akkaFrameSize).toArray) + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + val result = + sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x) + assert(result === 1.to(maxRpcMessageSize).toArray) val RESULT_BLOCK_ID = TaskResultBlockId(0) assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0, @@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local } val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) scheduler.taskResultGetter = resultGetter - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + val result = + sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x) assert(resultGetter.removeBlockSuccessfully) - assert(result === 1.to(akkaFrameSize).toArray) + assert(result === 1.to(maxRpcMessageSize).toArray) // Make sure two tasks were run (one failed one, and a second retried one). assert(scheduler.nextTaskId.get() === 2) -- cgit v1.2.3