aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala17
1 files changed, 8 insertions, 9 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index c87158d89f..58d217ffef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.Matchers
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.util.ResetSystemProperties
+import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
with ResetSystemProperties {
@@ -284,19 +284,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}
test("onTaskGettingResult() called when result fetched remotely") {
- val conf = new SparkConf().set("spark.akka.frameSize", "1")
+ val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
sc = new SparkContext("local", "SparkListenerSuite", conf)
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
- assert(akkaFrameSize === 1024 * 1024)
+ // Make a task whose result is larger than the RPC message size
+ val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
+ assert(maxRpcMessageSize === 1024 * 1024)
val result = sc.parallelize(Seq(1), 1)
- .map { x => 1.to(akkaFrameSize).toArray }
+ .map { x => 1.to(maxRpcMessageSize).toArray }
.reduce { case (x, y) => x }
- assert(result === 1.to(akkaFrameSize).toArray)
+ assert(result === 1.to(maxRpcMessageSize).toArray)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val TASK_INDEX = 0
@@ -310,7 +309,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val listener = new SaveTaskEvents
sc.addSparkListener(listener)
- // Make a task whose result is larger than the akka frame size
+ // Make a task whose result is larger than the RPC message size
val result = sc.parallelize(Seq(1), 1).map(2 * _).reduce { case (x, y) => x }
assert(result === 2)