aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 04:19:54 +0000
committerroot <root@ip-10-226-118-223.ec2.internal>2012-10-07 04:19:54 +0000
commita73b25826be808a1be1de8b61c4c7d2df2bcd5aa (patch)
tree4800cd4d7a66e76e0a17064d2c4ff6f344eb2371 /core/src/main
parentce915cadee1de8e265f090b7be2f6e70d1b4062e (diff)
downloadspark-a73b25826be808a1be1de8b61c4c7d2df2bcd5aa.tar.gz
spark-a73b25826be808a1be1de8b61c4c7d2df2bcd5aa.tar.bz2
spark-a73b25826be808a1be1de8b61c4c7d2df2bcd5aa.zip
Made Akka thread pool and message batch sizes configurable
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala8
1 files changed, 5 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index f670ccb709..b466b5239c 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -23,6 +23,8 @@ private[spark] object AkkaUtils {
* ActorSystem itself and its port (which is hard to get from Akka).
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
+ val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
+ val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
@@ -31,9 +33,9 @@ private[spark] object AkkaUtils {
akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s
- akka.remote.netty.execution-pool-size = 8
- akka.actor.default-dispatcher.throughput = 30
- """.format(host, port))
+ akka.remote.netty.execution-pool-size = %d
+ akka.actor.default-dispatcher.throughput = %d
+ """.format(host, port, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)