aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-10-10 14:03:56 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-10-10 15:13:44 +0530
commit34da58ae50cd97a1136d45484130addcf6ac8a33 (patch)
tree1fa4ac83574411bad58346f50ab145d3a4523441
parent026ab7566167e6c8ab1b0cce75b9e09bbd485bee (diff)
downloadspark-34da58ae50cd97a1136d45484130addcf6ac8a33.tar.gz
spark-34da58ae50cd97a1136d45484130addcf6ac8a33.tar.bz2
spark-34da58ae50cd97a1136d45484130addcf6ac8a33.zip
Changed message-frame-size to maximum-frame-size as property.
Removed a test accidentally added during merge.
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala4
4 files changed, 5 insertions, 18 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index b4153f3533..3800063234 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -105,7 +105,7 @@ private[spark] class Executor(
SparkEnv.set(env)
env.metricsSystem.registerSource(executorSource)
- private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size")
+ private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index af1c36b34d..8daf50ab69 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -54,8 +54,8 @@ private[spark] object AkkaUtils {
akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
akka.remote.netty.tcp.hostname = "%s"
akka.remote.netty.tcp.port = %d
- akka.remote.netty.tcp.connection-timeout = %ds
- akka.remote.netty.tcp.message-frame-size = %d MiB
+ akka.remote.netty.tcp.connection-timeout = %d s
+ akka.remote.netty.tcp.maximum-frame-size = %dMiB
akka.remote.netty.tcp.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d
akka.remote.log-remote-lifecycle-events = %s
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 25b9c3eb78..988ab1747d 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -320,19 +320,6 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
}
- test("job should fail if TaskResult exceeds Akka frame size") {
- // We must use local-cluster mode since results are returned differently
- // when running under LocalScheduler:
- sc = new SparkContext("local-cluster[1,1,512]", "test")
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
- val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
- val exception = intercept[SparkException] {
- rdd.reduce((x, y) => x)
- }
- exception.getMessage should endWith("result exceeded Akka frame size")
- }
-
}
object DistributedSuite {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
index 370a3eb0eb..a00198db8c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -81,7 +81,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
test("handling results larger than Akka frame size") {
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)
@@ -102,7 +102,7 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
}
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.maximum-frame-size").toInt
val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
assert(result === 1.to(akkaFrameSize).toArray)