diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-22 21:20:04 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-22 21:20:04 -0800 |
commit | bc1babd63da4ee56e6d371eb24805a5d714e8295 (patch) | |
tree | 8aec6a20e3d23574f53d818752df61a28c64d635 /core/src/test/scala/org | |
parent | d8fefab4d8149f0638282570c75271ef35c65cff (diff) | |
download | spark-bc1babd63da4ee56e6d371eb24805a5d714e8295.tar.gz spark-bc1babd63da4ee56e6d371eb24805a5d714e8295.tar.bz2 spark-bc1babd63da4ee56e6d371eb24805a5d714e8295.zip |
[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming
- Remove Akka dependency from core. Note: the streaming-akka project still uses Akka.
- Remove HttpFileServer
- Remove Akka configs from SparkConf and SSLOptions
- Rename `spark.akka.frameSize` to `spark.rpc.message.maxSize`. I think it's still worth to keep this config because using `DirectTaskResult` or `IndirectTaskResult` depends on it.
- Update comments and docs
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10854 from zsxwing/remove-akka.
Diffstat (limited to 'core/src/test/scala/org')
12 files changed, 39 insertions, 318 deletions
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala deleted file mode 100644 index bc7059b77f..0000000000 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark - -import java.io._ -import java.net.URI -import java.util.jar.{JarEntry, JarOutputStream} -import javax.net.ssl.SSLException - -import com.google.common.io.{ByteStreams, Files} -import org.apache.commons.lang3.RandomUtils - -import org.apache.spark.util.Utils - -class FileServerSuite extends SparkFunSuite with LocalSparkContext { - - import SSLSampleConfigs._ - - @transient var tmpDir: File = _ - @transient var tmpFile: File = _ - @transient var tmpJarUrl: String = _ - - def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false") - - override def beforeEach() { - super.beforeEach() - resetSparkContext() - } - - override def beforeAll() { - super.beforeAll() - - tmpDir = Utils.createTempDir() - val testTempDir = new File(tmpDir, "test") - testTempDir.mkdir() - - val textFile = new File(testTempDir, "FileServerSuite.txt") - val pw = new PrintWriter(textFile) - // scalastyle:off println - pw.println("100") - // scalastyle:on println - pw.close() - - val jarFile = new File(testTempDir, "test.jar") - val jarStream = new FileOutputStream(jarFile) - val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest()) - - val jarEntry = new JarEntry(textFile.getName) - jar.putNextEntry(jarEntry) - - val in = new FileInputStream(textFile) - ByteStreams.copy(in, jar) - - in.close() - jar.close() - jarStream.close() - - tmpFile = textFile - tmpJarUrl = jarFile.toURI.toURL.toString - } - - override def afterAll() { - try { - Utils.deleteRecursively(tmpDir) - } finally { - super.afterAll() - } - } - - test("Distributing files locally") { - sc = new SparkContext("local[4]", "test", newConf) - sc.addFile(tmpFile.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test("Distributing files locally security On") { - val sparkConf = new SparkConf(false) - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "good") - sc = new SparkContext("local[4]", "test", sparkConf) - - sc.addFile(tmpFile.toString) - assert(sc.env.securityManager.isAuthenticationEnabled() === true) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test("Distributing files locally using URL as input") { - // addFile("file:///....") - sc = new SparkContext("local[4]", "test", newConf) - sc.addFile(new File(tmpFile.toString).toURI.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test ("Dynamically adding JARS locally") { - sc = new SparkContext("local[4]", "test", newConf) - sc.addJar(tmpJarUrl) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test("Distributing files on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addFile(tmpFile.toString) - val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) - val result = sc.parallelize(testData).reduceByKey { - val path = SparkFiles.get("FileServerSuite.txt") - val in = new BufferedReader(new FileReader(path)) - val fileVal = in.readLine().toInt - in.close() - _ * fileVal + _ * fileVal - }.collect() - assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) - } - - test ("Dynamically adding JARS on a standalone cluster") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addJar(tmpJarUrl) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test ("Dynamically adding JARS on a standalone cluster using local: URL") { - sc = new SparkContext("local-cluster[1,1,1024]", "test", newConf) - sc.addJar(tmpJarUrl.replace("file", "local")) - val testData = Array((1, 1)) - sc.parallelize(testData).foreach { x => - if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { - throw new SparkException("jar not added") - } - } - } - - test ("HttpFileServer should work with SSL") { - val sparkConf = sparkSSLConfig() - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - fileTransferTest(server, sm) - } finally { - server.stop() - } - } - - test ("HttpFileServer should work with SSL and good credentials") { - val sparkConf = sparkSSLConfig() - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "good") - - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - fileTransferTest(server, sm) - } finally { - server.stop() - } - } - - test ("HttpFileServer should not work with valid SSL and bad credentials") { - val sparkConf = sparkSSLConfig() - sparkConf.set("spark.authenticate", "true") - sparkConf.set("spark.authenticate.secret", "bad") - - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - intercept[IOException] { - fileTransferTest(server) - } - } finally { - server.stop() - } - } - - test ("HttpFileServer should not work with SSL when the server is untrusted") { - val sparkConf = sparkSSLConfigUntrusted() - val sm = new SecurityManager(sparkConf) - val server = new HttpFileServer(sparkConf, sm, 0) - try { - server.initialize() - - intercept[SSLException] { - fileTransferTest(server) - } - } finally { - server.stop() - } - } - - def fileTransferTest(server: HttpFileServer, sm: SecurityManager = null): Unit = { - val randomContent = RandomUtils.nextBytes(100) - val file = File.createTempFile("FileServerSuite", "sslTests", tmpDir) - Files.write(randomContent, file) - server.addFile(file) - - val uri = new URI(server.serverUri + "/files/" + file.getName) - - val connection = if (sm != null && sm.isAuthenticationEnabled()) { - Utils.constructURIForAuthentication(uri, sm).toURL.openConnection() - } else { - uri.toURL.openConnection() - } - - if (sm != null) { - Utils.setupSecureURLConnection(connection, sm) - } - - val buf = ByteStreams.toByteArray(connection.getInputStream) - assert(buf === randomContent) - } - -} diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 18e5350840..c7f629a14b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -175,9 +175,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( - RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId1, dummyExecutorEndpointRef1, 0, Map.empty)) fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( - RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) + RegisterExecutor(executorId2, dummyExecutorEndpointRef2, 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1) addExecutorAndVerify(executorId2) diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index e1a0bf7c93..24ec99c7e5 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -52,7 +52,7 @@ object LocalSparkContext { if (sc != null) { sc.stop() } - // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + // To avoid RPC rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") } diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 3819c0a8f3..6546def596 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -154,9 +154,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { slaveRpcEnv.shutdown() } - test("remote fetch below akka frame size") { + test("remote fetch below max RPC message size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - // Frame size should be ~123B, and no exception should be thrown + // Message size should be ~123B, and no exception should be thrown masterTracker.registerShuffle(10, 1) masterTracker.registerMapOutput(10, 0, MapStatus( BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) @@ -179,9 +179,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { rpcEnv.shutdown() } - test("remote fetch exceeds akka frame size") { + test("remote fetch exceeds max RPC message size") { val newConf = new SparkConf - newConf.set("spark.akka.frameSize", "1") + newConf.set("spark.rpc.message.maxSize", "1") newConf.set("spark.rpc.askTimeout", "1") // Fail fast val masterTracker = new MapOutputTrackerMaster(conf) @@ -189,7 +189,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { val masterEndpoint = new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, newConf) rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME, masterEndpoint) - // Frame size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception. + // Message size should be ~1.1MB, and MapOutputTrackerMasterEndpoint should throw exception. // Note that the size is hand-selected here because map output statuses are compressed before // being sent. masterTracker.registerShuffle(20, 100) diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index 7603cef773..8bdb237c28 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -181,10 +181,8 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { "SSL_DHE_RSA_WITH_AES_128_CBC_SHA256") val securityManager = new SecurityManager(conf) - val akkaSSLOptions = securityManager.getSSLOptions("akka") assert(securityManager.fileServerSSLOptions.enabled === true) - assert(akkaSSLOptions.enabled === true) assert(securityManager.sslSocketFactory.isDefined === true) assert(securityManager.hostnameVerifier.isDefined === true) @@ -198,16 +196,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(securityManager.fileServerSSLOptions.keyPassword === Some("password")) assert(securityManager.fileServerSSLOptions.protocol === Some("TLSv1.2")) assert(securityManager.fileServerSSLOptions.enabledAlgorithms === expectedAlgorithms) - - assert(akkaSSLOptions.trustStore.isDefined === true) - assert(akkaSSLOptions.trustStore.get.getName === "truststore") - assert(akkaSSLOptions.keyStore.isDefined === true) - assert(akkaSSLOptions.keyStore.get.getName === "keystore") - assert(akkaSSLOptions.trustStorePassword === Some("password")) - assert(akkaSSLOptions.keyStorePassword === Some("password")) - assert(akkaSSLOptions.keyPassword === Some("password")) - assert(akkaSSLOptions.protocol === Some("TLSv1.2")) - assert(akkaSSLOptions.enabledAlgorithms === expectedAlgorithms) } test("ssl off setup") { diff --git a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala index 190e4dd728..9c13c15281 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DeployTestUtils.scala @@ -69,7 +69,7 @@ private[deploy] object DeployTestUtils { "publicAddress", new File("sparkHome"), new File("workDir"), - "akka://worker", + "spark://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) @@ -84,7 +84,7 @@ private[deploy] object DeployTestUtils { new File("sparkHome"), createDriverDesc(), null, - "akka://worker", + "spark://worker", new SecurityManager(conf)) } } diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index ab3d4cafeb..fdada0777f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -544,7 +544,7 @@ class StandaloneDynamicAllocationSuite val endpointRef = mock(classOf[RpcEndpointRef]) val mockAddress = mock(classOf[RpcAddress]) when(endpointRef.address).thenReturn(mockAddress) - val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty) + val message = RegisterExecutor(id, endpointRef, 10, Map.empty) val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message) } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index bd8b0655f4..2a1696be36 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -34,7 +34,7 @@ class DriverRunnerTest extends SparkFunSuite { val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command) val conf = new SparkConf() new DriverRunner(conf, "driverId", new File("workDir"), new File("sparkHome"), - driverDescription, null, "akka://1.2.3.4/worker/", new SecurityManager(conf)) + driverDescription, null, "spark://1.2.3.4/worker/", new SecurityManager(conf)) } private def createProcessBuilderAndProcess(): (ProcessBuilderLike, Process) = { diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 64e486d791..6f4eda8b47 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -626,9 +626,7 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val e = intercept[Exception] { Await.result(f, 1 seconds) } - assert(e.isInstanceOf[TimeoutException] || // For Akka - e.isInstanceOf[NotSerializableException] // For Netty - ) + assert(e.isInstanceOf[NotSerializableException]) } finally { anotherEnv.shutdown() anotherEnv.awaitTermination() diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 70f40fb26c..04cccc67e3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -18,16 +18,16 @@ package org.apache.spark.scheduler import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} -import org.apache.spark.util.{AkkaUtils, SerializableBuffer} +import org.apache.spark.util.{RpcUtils, SerializableBuffer} class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext { - test("serialized task larger than akka frame size") { + test("serialized task larger than max RPC message size") { val conf = new SparkConf - conf.set("spark.akka.frameSize", "1") + conf.set("spark.rpc.message.maxSize", "1") conf.set("spark.default.parallelism", "1") sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) - val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) + val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf) val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) val larger = sc.parallelize(Seq(buffer)) val thrown = intercept[SparkException] { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index c87158d89f..58d217ffef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.Matchers import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.util.ResetSystemProperties +import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers with ResetSystemProperties { @@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("onTaskGettingResult() called when result fetched remotely") { - val conf = new SparkConf().set("spark.akka.frameSize", "1") + val conf = new SparkConf().set("spark.rpc.message.maxSize", "1") sc = new SparkContext("local", "SparkListenerSuite", conf) val listener = new SaveTaskEvents sc.addSparkListener(listener) - // Make a task whose result is larger than the akka frame size - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - assert(akkaFrameSize === 1024 * 1024) + // Make a task whose result is larger than the RPC message size + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + assert(maxRpcMessageSize === 1024 * 1024) val result = sc.parallelize(Seq(1), 1) - .map { x => 1.to(akkaFrameSize).toArray } + .map { x => 1.to(maxRpcMessageSize).toArray } .reduce { case (x, y) => x } - assert(result === 1.to(akkaFrameSize).toArray) + assert(result === 1.to(maxRpcMessageSize).toArray) sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) val TASK_INDEX = 0 @@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val listener = new SaveTaskEvents sc.addSparkListener(listener) - // Make a task whose result is larger than the akka frame size + // Make a task whose result is larger than the RPC message size val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x } assert(result === 2) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index bc72c3685e..cc2557c2f1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.storage.TaskResultBlockId import org.apache.spark.TestUtils.JavaSourceFromString -import org.apache.spark.util.{MutableURLClassLoader, Utils} +import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils} /** * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter. @@ -77,22 +77,22 @@ class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedule */ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext { - // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small + // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small // as we can make it) so the tests don't take too long. - def conf: SparkConf = new SparkConf().set("spark.akka.frameSize", "1") + def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1") - test("handling results smaller than Akka frame size") { + test("handling results smaller than max RPC message size") { sc = new SparkContext("local", "test", conf) val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x) assert(result === 2) } - test("handling results larger than Akka frame size") { + test("handling results larger than max RPC message size") { sc = new SparkContext("local", "test", conf) - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) - assert(result === 1.to(akkaFrameSize).toArray) + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + val result = + sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x) + assert(result === 1.to(maxRpcMessageSize).toArray) val RESULT_BLOCK_ID = TaskResultBlockId(0) assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0, @@ -114,11 +114,11 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local } val resultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) scheduler.taskResultGetter = resultGetter - val akkaFrameSize = - sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt - val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x) + val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) + val result = + sc.parallelize(Seq(1), 1).map(x => 1.to(maxRpcMessageSize).toArray).reduce((x, y) => x) assert(resultGetter.removeBlockSuccessfully) - assert(result === 1.to(akkaFrameSize).toArray) + assert(result === 1.to(maxRpcMessageSize).toArray) // Make sure two tasks were run (one failed one, and a second retried one). assert(scheduler.nextTaskId.get() === 2) |