aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala30
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala71
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala360
12 files changed, 26 insertions, 464 deletions
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 7e70308bb3..5b29d69cd9 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -125,7 +125,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val slaveRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(conf))
val slaveTracker = new MapOutputTrackerWorker(conf)
slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
+ slaveRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
index 2d14249855..33270bec62 100644
--- a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -41,7 +41,6 @@ object SSLSampleConfigs {
def sparkSSLConfig(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
- conf.set("spark.rpc", "akka")
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
@@ -55,7 +54,6 @@ object SSLSampleConfigs {
def sparkSSLConfigUntrusted(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
- conf.set("spark.rpc", "akka")
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", untrustedKeyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 85c1c1bbf3..ab3d4cafeb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -474,7 +474,7 @@ class StandaloneDynamicAllocationSuite
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
- Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
+ Worker.ENDPOINT_NAME, null, conf, securityManager)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 415e2b37db..eb794b6739 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -147,7 +147,7 @@ class AppClientSuite extends SparkFunSuite with LocalSparkContext with BeforeAnd
(0 until numWorkers).map { i =>
val rpcEnv = workerRpcEnvs(i)
val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
- Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
+ Worker.ENDPOINT_NAME, null, conf, securityManager)
rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
worker
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 242bf4b556..10e33a32ba 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -98,7 +98,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
Master.startRpcEnvAndEndpoint("127.0.0.1", 0, 0, conf)
try {
- rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, rpcEnv.address, Master.ENDPOINT_NAME)
+ rpcEnv.setupEndpointRef(rpcEnv.address, Master.ENDPOINT_NAME)
CustomPersistenceEngine.lastInstance.isDefined shouldBe true
val persistenceEngine = CustomPersistenceEngine.lastInstance.get
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index faed4bdc68..082d5e86eb 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -67,7 +67,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
conf.set("spark.worker.ui.retainedExecutors", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
- "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 5) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -93,7 +93,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
conf.set("spark.worker.ui.retainedExecutors", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
- "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 50) {
worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -128,7 +128,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
conf.set("spark.worker.ui.retainedDrivers", 2.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
- "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 5) {
val driverId = s"driverId-$i"
@@ -154,7 +154,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
conf.set("spark.worker.ui.retainedDrivers", 30.toString)
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
- "sparkWorker1", "Worker", "/tmp", conf, new SecurityManager(conf))
+ "Worker", "/tmp", conf, new SecurityManager(conf))
// initialize workers
for (i <- 0 until 50) {
val driverId = s"driverId-$i"
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
index 40c24bdecc..0ffd91d8ff 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala
@@ -19,13 +19,13 @@ package org.apache.spark.deploy.worker
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.SecurityManager
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.rpc.{RpcEndpointAddress, RpcAddress, RpcEnv}
class WorkerWatcherSuite extends SparkFunSuite {
test("WorkerWatcher shuts down on valid disassociation") {
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
- val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker")
+ val targetWorkerUrl = RpcEndpointAddress(RpcAddress("1.2.3.4", 1234), "Worker").toString
val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl, isTesting = true)
rpcEnv.setupEndpoint("worker-watcher", workerWatcher)
workerWatcher.onDisconnected(RpcAddress("1.2.3.4", 1234))
@@ -36,7 +36,7 @@ class WorkerWatcherSuite extends SparkFunSuite {
test("WorkerWatcher stays alive on invalid disassociation") {
val conf = new SparkConf()
val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new SecurityManager(conf))
- val targetWorkerUrl = rpcEnv.uriOf("test", RpcAddress("1.2.3.4", 1234), "Worker")
+ val targetWorkerUrl = RpcEndpointAddress(RpcAddress("1.2.3.4", 1234), "Worker").toString
val otherRpcAddress = RpcAddress("4.3.2.1", 1234)
val workerWatcher = new WorkerWatcher(rpcEnv, targetWorkerUrl, isTesting = true)
rpcEnv.setupEndpoint("worker-watcher", workerWatcher)
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 9c850c0da5..924fce7f61 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -94,7 +94,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely")
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "send-remotely")
try {
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(10 millis)) {
@@ -148,7 +148,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-remotely")
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-remotely")
try {
val reply = rpcEndpointRef.askWithRetry[String]("hello")
assert("hello" === reply)
@@ -176,7 +176,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.rpc.numRetries", "1")
val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "ask-timeout")
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout")
try {
// Any exception thrown in askWithRetry is wrapped with a SparkException and set as the cause
val e = intercept[SparkException] {
@@ -435,7 +435,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "sendWithReply-remotely")
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely")
try {
val f = rpcEndpointRef.ask[String]("hello")
val ack = Await.result(f, 5 seconds)
@@ -475,8 +475,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef(
- "local", env.address, "sendWithReply-remotely-error")
+ val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "sendWithReply-remotely-error")
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[SparkException] {
@@ -527,8 +526,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val (_, events) = setupNetworkEndpoint(serverEnv1, "network-events")
val (serverRef2, _) = setupNetworkEndpoint(serverEnv2, "network-events")
try {
- val serverRefInServer2 =
- serverEnv1.setupEndpointRef("server2", serverRef2.address, serverRef2.name)
+ val serverRefInServer2 = serverEnv1.setupEndpointRef(serverRef2.address, serverRef2.name)
// Send a message to set up the connection
serverRefInServer2.send("hello")
@@ -556,8 +554,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val (serverRef, events) = setupNetworkEndpoint(serverEnv, "network-events")
val clientEnv = createRpcEnv(new SparkConf(), "client", 0, clientMode = true)
try {
- val serverRefInClient =
- clientEnv.setupEndpointRef("server", serverRef.address, serverRef.name)
+ val serverRefInClient = clientEnv.setupEndpointRef(serverRef.address, serverRef.name)
// Send a message to set up the connection
serverRefInClient.send("hello")
@@ -588,8 +585,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val (_, events) = setupNetworkEndpoint(clientEnv, "network-events")
val (serverRef, _) = setupNetworkEndpoint(serverEnv, "network-events")
try {
- val serverRefInClient =
- clientEnv.setupEndpointRef("server", serverRef.address, serverRef.name)
+ val serverRefInClient = clientEnv.setupEndpointRef(serverRef.address, serverRef.name)
// Send a message to set up the connection
serverRefInClient.send("hello")
@@ -623,8 +619,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val anotherEnv = createRpcEnv(new SparkConf(), "remote", 0, clientMode = true)
// Use anotherEnv to find out the RpcEndpointRef
- val rpcEndpointRef = anotherEnv.setupEndpointRef(
- "local", env.address, "sendWithReply-unserializable-error")
+ val rpcEndpointRef =
+ anotherEnv.setupEndpointRef(env.address, "sendWithReply-unserializable-error")
try {
val f = rpcEndpointRef.ask[String]("hello")
val e = intercept[Exception] {
@@ -661,8 +657,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
case msg: String => message = msg
}
})
- val rpcEndpointRef =
- remoteEnv.setupEndpointRef("authentication-local", localEnv.address, "send-authentication")
+ val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "send-authentication")
rpcEndpointRef.send("hello")
eventually(timeout(5 seconds), interval(10 millis)) {
assert("hello" === message)
@@ -693,8 +688,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
}
}
})
- val rpcEndpointRef =
- remoteEnv.setupEndpointRef("authentication-local", localEnv.address, "ask-authentication")
+ val rpcEndpointRef = remoteEnv.setupEndpointRef(localEnv.address, "ask-authentication")
val reply = rpcEndpointRef.askWithRetry[String]("hello")
assert("hello" === reply)
} finally {
diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
deleted file mode 100644
index 7aac02775e..0000000000
--- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rpc.akka
-
-import org.apache.spark.rpc._
-import org.apache.spark.{SSLSampleConfigs, SecurityManager, SparkConf}
-
-class AkkaRpcEnvSuite extends RpcEnvSuite {
-
- override def createRpcEnv(conf: SparkConf,
- name: String,
- port: Int,
- clientMode: Boolean = false): RpcEnv = {
- new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, name, "localhost", port, new SecurityManager(conf), clientMode))
- }
-
- test("setupEndpointRef: systemName, address, endpointName") {
- val ref = env.setupEndpoint("test_endpoint", new RpcEndpoint {
- override val rpcEnv = env
-
- override def receive = {
- case _ =>
- }
- })
- val conf = new SparkConf()
- val newRpcEnv = new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, "test", "localhost", 0, new SecurityManager(conf), false))
- try {
- val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_endpoint")
- assert(s"akka.tcp://local@${env.address}/user/test_endpoint" ===
- newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef.path.toString)
- } finally {
- newRpcEnv.shutdown()
- }
- }
-
- test("uriOf") {
- val uri = env.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint")
- assert("akka.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri)
- }
-
- test("uriOf: ssl") {
- val conf = SSLSampleConfigs.sparkSSLConfig()
- val securityManager = new SecurityManager(conf)
- val rpcEnv = new AkkaRpcEnvFactory().create(
- RpcEnvConfig(conf, "test", "localhost", 0, securityManager, false))
- try {
- val uri = rpcEnv.uriOf("local", RpcAddress("1.2.3.4", 12345), "test_endpoint")
- assert("akka.ssl.tcp://local@1.2.3.4:12345/user/test_endpoint" === uri)
- } finally {
- rpcEnv.shutdown()
- }
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
index 56743ba650..4fcdb619f9 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcAddressSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.rpc.netty
import org.apache.spark.SparkFunSuite
+import org.apache.spark.rpc.RpcEndpointAddress
class NettyRpcAddressSuite extends SparkFunSuite {
diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index ce83087ec0..994a58836b 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -33,9 +33,9 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
}
test("non-existent endpoint") {
- val uri = env.uriOf("test", env.address, "nonexist-endpoint")
+ val uri = RpcEndpointAddress(env.address, "nonexist-endpoint").toString
val e = intercept[RpcEndpointNotFoundException] {
- env.setupEndpointRef("test", env.address, "nonexist-endpoint")
+ env.setupEndpointRef(env.address, "nonexist-endpoint")
}
assert(e.getMessage.contains(uri))
}
diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
deleted file mode 100644
index 0af4b6098b..0000000000
--- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.collection.mutable.ArrayBuffer
-
-import java.util.concurrent.TimeoutException
-
-import akka.actor.ActorNotFound
-
-import org.apache.spark._
-import org.apache.spark.rpc.RpcEnv
-import org.apache.spark.scheduler.MapStatus
-import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId}
-import org.apache.spark.SSLSampleConfigs._
-
-
-/**
- * Test the AkkaUtils with various security settings.
- */
-class AkkaUtilsSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties {
-
- test("remote fetch security bad password") {
- val conf = new SparkConf
- conf.set("spark.rpc", "akka")
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
-
- val securityManager = new SecurityManager(conf)
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
- assert(securityManager.isAuthenticationEnabled() === true)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val badconf = new SparkConf
- badconf.set("spark.rpc", "akka")
- badconf.set("spark.authenticate", "true")
- badconf.set("spark.authenticate.secret", "bad")
- val securityManagerBad = new SecurityManager(badconf)
-
- assert(securityManagerBad.isAuthenticationEnabled() === true)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, conf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
- }
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
- test("remote fetch security off") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "false")
- conf.set("spark.authenticate.secret", "bad")
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === false)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val badconf = new SparkConf
- badconf.set("spark.authenticate", "false")
- badconf.set("spark.authenticate.secret", "good")
- val securityManagerBad = new SecurityManager(badconf)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
-
- assert(securityManagerBad.isAuthenticationEnabled() === false)
-
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
- masterTracker.registerMapOutput(10, 0,
- MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- // this should succeed since security off
- assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq ===
- Seq((BlockManagerId("a", "hostA", 1000),
- ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
- test("remote fetch security pass") {
- val conf = new SparkConf
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === true)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val goodconf = new SparkConf
- goodconf.set("spark.authenticate", "true")
- goodconf.set("spark.authenticate.secret", "good")
- val securityManagerGood = new SecurityManager(goodconf)
-
- assert(securityManagerGood.isAuthenticationEnabled() === true)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
-
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
- masterTracker.registerMapOutput(10, 0, MapStatus(
- BlockManagerId("a", "hostA", 1000), Array(1000L)))
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- // this should succeed since security on and passwords match
- assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
- Seq((BlockManagerId("a", "hostA", 1000),
- ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
- test("remote fetch security off client") {
- val conf = new SparkConf
- conf.set("spark.rpc", "akka")
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
-
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === true)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val badconf = new SparkConf
- badconf.set("spark.rpc", "akka")
- badconf.set("spark.authenticate", "false")
- badconf.set("spark.authenticate.secret", "bad")
- val securityManagerBad = new SecurityManager(badconf)
-
- assert(securityManagerBad.isAuthenticationEnabled() === false)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, badconf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
- }
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
- test("remote fetch ssl on") {
- val conf = sparkSSLConfig()
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === false)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val slaveConf = sparkSSLConfig()
- val securityManagerBad = new SecurityManager(slaveConf)
-
- val slaveRpcEnv = RpcEnv.create("spark-slaves", hostname, 0, slaveConf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
-
- assert(securityManagerBad.isAuthenticationEnabled() === false)
-
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
- masterTracker.registerMapOutput(10, 0,
- MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- // this should succeed since security off
- assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
- Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
-
- test("remote fetch ssl on and security enabled") {
- val conf = sparkSSLConfig()
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === true)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val slaveConf = sparkSSLConfig()
- slaveConf.set("spark.authenticate", "true")
- slaveConf.set("spark.authenticate.secret", "good")
- val securityManagerBad = new SecurityManager(slaveConf)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
-
- assert(securityManagerBad.isAuthenticationEnabled() === true)
-
- masterTracker.registerShuffle(10, 1)
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L))
- masterTracker.registerMapOutput(10, 0,
- MapStatus(BlockManagerId("a", "hostA", 1000), Array(1000L)))
- masterTracker.incrementEpoch()
- slaveTracker.updateEpoch(masterTracker.getEpoch)
-
- assert(slaveTracker.getMapSizesByExecutorId(10, 0) ===
- Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000)))))
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
-
- test("remote fetch ssl on and security enabled - bad credentials") {
- val conf = sparkSSLConfig()
- conf.set("spark.rpc", "akka")
- conf.set("spark.authenticate", "true")
- conf.set("spark.authenticate.secret", "good")
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === true)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val slaveConf = sparkSSLConfig()
- slaveConf.set("spark.rpc", "akka")
- slaveConf.set("spark.authenticate", "true")
- slaveConf.set("spark.authenticate.secret", "bad")
- val securityManagerBad = new SecurityManager(slaveConf)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad)
- val slaveTracker = new MapOutputTrackerWorker(conf)
- intercept[akka.actor.ActorNotFound] {
- slaveTracker.trackerEndpoint =
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
- }
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
-
- test("remote fetch ssl on - untrusted server") {
- val conf = sparkSSLConfigUntrusted()
- val securityManager = new SecurityManager(conf)
-
- val hostname = "localhost"
- val rpcEnv = RpcEnv.create("spark", hostname, 0, conf, securityManager)
- System.setProperty("spark.hostPort", rpcEnv.address.hostPort)
-
- assert(securityManager.isAuthenticationEnabled() === false)
-
- val masterTracker = new MapOutputTrackerMaster(conf)
- masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
- new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, conf))
-
- val slaveConf = sparkSSLConfig()
- .set("spark.rpc.askTimeout", "5s")
- .set("spark.rpc.lookupTimeout", "5s")
- val securityManagerBad = new SecurityManager(slaveConf)
-
- val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, slaveConf, securityManagerBad)
- try {
- slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
- fail("should receive either ActorNotFound or TimeoutException")
- } catch {
- case e: ActorNotFound =>
- case e: TimeoutException =>
- }
-
- rpcEnv.shutdown()
- slaveRpcEnv.shutdown()
- }
-
-}