aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/FileServerSuite.scala264
-rw-r--r--core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/LocalSparkContext.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala26
12 files changed, 39 insertions, 318 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
deleted file mode 100644
index bc7059b77f..0000000000
--- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.URI
-import java.util.jar.{JarEntry, JarOutputStream}
-import javax.net.ssl.SSLException
-
-import com.google.common.io.{ByteStreams, Files}
-import org.apache.commons.lang3.RandomUtils
-
-import org.apache.spark.util.Utils
-
-class FileServerSuite extends SparkFunSuite with LocalSparkContext {
-
- import SSLSampleConfigs._
-
- @transient var tmpDir: File = _
- @transient var tmpFile: File = _
- @transient var tmpJarUrl: String = _
-
- def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")
-
- override def beforeEach() {
- super.beforeEach()
- resetSparkContext()
- }
-
- override def beforeAll() {
- super.beforeAll()
-
- tmpDir = Utils.createTempDir()
- val testTempDir = new File(tmpDir, "test")
- testTempDir.mkdir()
-
- val textFile = new File(testTempDir, "FileServerSuite.txt")
- val pw = new PrintWriter(textFile)
- // scalastyle:off println
- pw.println("100")
- // scalastyle:on println
- pw.close()
-
- val jarFile = new File(testTempDir, "test.jar")
- val jarStream = new FileOutputStream(jarFile)
- val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
-
- val jarEntry = new JarEntry(textFile.getName)
- jar.putNextEntry(jarEntry)
-
- val in = new FileInputStream(textFile)
- ByteStreams.copy(in, jar)
-
- in.close()
- jar.close()
- jarStream.close()
-
- tmpFile = textFile
- tmpJarUrl = jarFile.toURI.toURL.toString
- }
-
- override def afterAll() {
- try {
- Utils.deleteRecursively(tmpDir)
- } finally {
- super.afterAll()
- }
- }
-
- test("Distributing files locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally security On") {
- val sparkConf = new SparkConf(false)
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
- sc = new SparkContext("local[4]", "test", sparkConf)
-
- sc.addFile(tmpFile.toString)
- assert(sc.env.securityManager.isAuthenticationEnabled() === true)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test("Distributing files locally using URL as input") {
- // addFile("file:///....")
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addFile(new File(tmpFile.toString).toURI.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS locally") {
- sc = new SparkContext("local[4]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test("Distributing files on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addFile(tmpFile.toString)
- val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0))
- val result = sc.parallelize(testData).reduceByKey {
- val path = SparkFiles.get("FileServerSuite.txt")
- val in = new BufferedReader(new FileReader(path))
- val fileVal = in.readLine().toInt
- in.close()
- _ * fileVal + _ * fileVal
- }.collect()
- assert(result.toSet === Set((1, 200), (2, 300), (3, 500)))
- }
-
- test ("Dynamically adding JARS on a standalone cluster") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl)
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("Dynamically adding JARS on a standalone cluster using local: URL") {
- sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf)
- sc.addJar(tmpJarUrl.replace("file", "local"))
- val testData = Array((1, 1))
- sc.parallelize(testData).foreach { x =>
- if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
- throw new SparkException("jar not added")
- }
- }
- }
-
- test ("HttpFileServer should work with SSL") {
- val sparkConf = sparkSSLConfig()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should work with SSL and good credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "good")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- fileTransferTest(server, sm)
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with valid SSL and bad credentials") {
- val sparkConf = sparkSSLConfig()
- sparkConf.set("spark.authenticate", "true")
- sparkConf.set("spark.authenticate.secret", "bad")
-
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[IOException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- test ("HttpFileServer should not work with SSL when the server is untrusted") {
- val sparkConf = sparkSSLConfigUntrusted()
- val sm = new SecurityManager(sparkConf)
- val server = new HttpFileServer(sparkConf, sm, 0)
- try {
- server.initialize()
-
- intercept[SSLException] {
- fileTransferTest(server)
- }
- } finally {
- server.stop()
- }
- }
-
- def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = {
- val randomContent = RandomUtils.nextBytes(100)
- val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir)
- Files.write(randomContent, file)
- server.addFile(file)
-
- val uri = new URI(server.serverUri + "/files/" + file.getName)
-
- val connection = if (sm != null && sm.isAuthenticationEnabled()) {
- Utils.constructURIForAuthentication(uri, sm).toURL.openConnection()
- } else {
- uri.toURL.openConnection()
- }
-
- if (sm != null) {
- Utils.setupSecureURLConnection(connection, sm)
- }
-
- val buf = ByteStreams.toByteArray(connection.getInputStream)
- assert(buf === randomContent)
- }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 18e5350840..c7f629a14b 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -175,9 +175,9 @@ class HeartbeatReceiverSuite
val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1)
val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2)
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty))
fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse](
- RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty))
+ RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty))
heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet)
addExecutorAndVerify(executorId1)
addExecutorAndVerify(executorId2)
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index e1a0bf7c93..24ec99c7e5 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -52,7 +52,7 @@ object LocalSparkContext {
if (sc != null) {
sc.stop()
}
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 3819c0a8f3..6546def596 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
slaveRpcEnv.shutdown()
}
- test("remote fetch below akka frame size") {
+ test("remote fetch below max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~123B, and no exception should be thrown
+ // Message size should be ~123B, and no exception should be thrown
masterTracker.registerShuffle(10, 1)
masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
@@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite {
rpcEnv.shutdown()
}
- test("remote fetch exceeds akka frame size") {
+ test("remote fetch exceeds max RPC message size") {
val newConf = new SparkConf
- newConf.set("spark.akka.frameSize", "1")
+ newConf.set("spark.rpc.message.maxSize", "1")
newConf.set("spark.rpc.askTimeout", "1") // Fail fast
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite {
val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf)
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint)
- // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
+ // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index 7603cef773..8bdb237c28 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
"SSL_DHE_RSA_WITH_AES_128_CBC_SHA256")
val securityManager = new SecurityManager(conf)
- val akkaSSLOptions = securityManager.getSSLOptions("akka")
assert(securityManager.fileServerSSLOptions.enabled === true)
- assert(akkaSSLOptions.enabled === true)
assert(securityManager.sslSocketFactory.isDefined === true)
assert(securityManager.hostnameVerifier.isDefined === true)
@@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.fileServerSSLOptions.keyPassword === Some("password"))
assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms)
-
- assert(akkaSSLOptions.trustStore.isDefined === true)
- assert(akkaSSLOptions.trustStore.get.getName === "truststore")
- assert(akkaSSLOptions.keyStore.isDefined === true)
- assert(akkaSSLOptions.keyStore.get.getName === "keystore")
- assert(akkaSSLOptions.trustStorePassword === Some("password"))
- assert(akkaSSLOptions.keyStorePassword === Some("password"))
- assert(akkaSSLOptions.keyPassword === Some("password"))
- assert(akkaSSLOptions.protocol === Some("TLSv1.2"))
- assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
}
test("ssl off setup") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
index 190e4dd728..9c13c15281 100644
--- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala
@@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils {
"publicAddress",
new File("sparkHome"),
new File("workDir"),
- "akka://worker",
+ "spark://worker",
new SparkConf,
Seq("localDir"),
ExecutorState.RUNNING)
@@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils {
new File("sparkHome"),
createDriverDesc(),
null,
- "akka://worker",
+ "spark://worker",
new SecurityManager(conf))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index ab3d4cafeb..fdada0777f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite
val endpointRef = mock(classOf[RpcEndpointRef])
val mockAddress = mock(classOf[RpcAddress])
when(endpointRef.address).thenReturn(mockAddress)
- val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+ val message = RegisterExecutor(id, endpointRef, 10, Map.empty)
val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
index bd8b0655f4..2a1696be36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala
@@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite {
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
val conf = new SparkConf()
new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"),
- driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf))
+ driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf))
}
private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = {
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 64e486d791..6f4eda8b47 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll {
val e = intercept[Exception] {
Await.result(f, 1 seconds)
}
- assert(e.isInstanceOf[TimeoutException] || // For Akka
- e.isInstanceOf[NotSerializableException] // For Netty
- )
+ assert(e.isInstanceOf[NotSerializableException])
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70f40fb26c..04cccc67e3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -18,16 +18,16 @@
package org.apache.spark.scheduler
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.util.{AkkaUtils, SerializableBuffer}
+import org.apache.spark.util.{RpcUtils, SerializableBuffer}
class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext {
- test("serialized task larger than akka frame size") {
+ test("serialized task larger than max RPC message size") {
val conf = new SparkConf
- conf.set("spark.akka.frameSize", "1")
+ conf.set("spark.rpc.message.maxSize", "1")
conf.set("spark.default.parallelism", "1")
sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
- val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf)
+ val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize))
val larger = sc.parallelize(Seq(buffer))
val thrown = intercept[SparkException] {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d89f..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("onTaskGettingResult() called when result fetched remotely") {
- val conf = new SparkConf().set("spark.akka.frameSize", "1")
+ val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
sc = new SparkContext("local", "SparkListenerSuite", conf)
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- assert(akkaFrameSize === 1024 * 1024)
+ // Make a task whose result is larger than the RPC message size
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ assert(maxRpcMessageSize === 1024 * 1024)
val result = sc.parallelize(Seq(1), 1)
- .map { x => 1.to(akkaFrameSize).toArray }
+ .map { x => 1.to(maxRpcMessageSize).toArray }
.reduce { case (x, y) => x }
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
+ // Make a task whose result is larger than the RPC message size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index bc72c3685e..cc2557c2f1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.storage.TaskResultBlockId
import org.apache.spark.TestUtils.JavaSourceFromString
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
/**
* Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
@@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule
*/
class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
- // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small
// as we can make it) so the tests don't take too long.
- def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1")
+ def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
- test("handling results smaller than Akka frame size") {
+ test("handling results smaller than max RPC message size") {
sc = new SparkContext("local", "test", conf)
val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
assert(result === 2)
}
- test("handling results larger than Akka frame size") {
+ test("handling results larger than max RPC message size") {
sc = new SparkContext("local", "test", conf)
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
- assert(result === 1.to(akkaFrameSize).toArray)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
val RESULT_BLOCK_ID = TaskResultBlockId(0)
assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
@@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local
}
val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
scheduler.taskResultGetter = resultGetter
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ val result =
+ sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x)
assert(resultGetter.removeBlockSuccessfully)
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
// Make sure two tasks were run (one failed one, and a second retried one).
assert(scheduler.nextTaskId.get() === 2)