aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorPrashant Sharma <scrapcodes@gmail.com>2013-09-22 08:20:12 +0530
committerPrashant Sharma <scrapcodes@gmail.com>2013-09-22 08:20:12 +0530
commit276c37a51c9a6188dbbe02754935540ace338dd1 (patch)
tree42f08c5255bf7cb58e06580bd1812573bf487dbc /core
parent69fd42aee3f3fed8dbb5f2933413cbf31cac74d1 (diff)
downloadspark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.gz
spark-276c37a51c9a6188dbbe02754935540ace338dd1.tar.bz2
spark-276c37a51c9a6188dbbe02754935540ace338dd1.zip
Akka 2.2 migration
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala30
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala23
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/DriverSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala2
13 files changed, 78 insertions, 54 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1e63b54b7a..a267407c67 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -169,7 +169,7 @@ object SparkEnv extends Logging {
val driverHost: String = System.getProperty("spark.driver.host", "localhost")
val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+ val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
actorSystem.actorFor(url)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 14a90934f6..164386782c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -26,9 +26,7 @@ import akka.actor._
import akka.actor.Terminated
import akka.pattern.AskTimeoutException
import akka.pattern.ask
-import akka.remote.RemoteClientDisconnected
-import akka.remote.RemoteClientLifeCycleEvent
-import akka.remote.RemoteClientShutdown
+import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
import org.apache.spark.Logging
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -61,7 +59,7 @@ private[spark] class Client(
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master ! RegisterApplication(appDescription)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
@@ -99,12 +97,12 @@ private[spark] class Client(
markDisconnected()
context.stop(self)
- case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ case DisassociatedEvent(_, address, _) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
- case RemoteClientShutdown(transport, address) if address == masterAddress =>
+ case AssociationErrorEvent(_, _, address, _) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 2efd16bca0..cb0fe6a850 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -25,9 +25,8 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
-import akka.actor.Terminated
import akka.pattern.ask
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.remote._
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
@@ -36,6 +35,22 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import akka.util.Timeout
+import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
+import org.apache.spark.deploy.DeployMessages.KillExecutor
+import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import scala.Some
+import org.apache.spark.deploy.DeployMessages.WebUIPortResponse
+import org.apache.spark.deploy.DeployMessages.LaunchExecutor
+import org.apache.spark.deploy.DeployMessages.RegisteredApplication
+import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.deploy.DeployMessages.ExecutorUpdated
+import org.apache.spark.deploy.DeployMessages.MasterStateResponse
+import org.apache.spark.deploy.DeployMessages.ExecutorAdded
+import org.apache.spark.deploy.DeployMessages.RegisterApplication
+import org.apache.spark.deploy.DeployMessages.ApplicationRemoved
+import org.apache.spark.deploy.DeployMessages.Heartbeat
+import org.apache.spark.deploy.DeployMessages.RegisteredWorker
+import akka.actor.Terminated
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -81,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def preStart() {
logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
import context.dispatcher
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
@@ -165,13 +180,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
actorToApp.get(actor).foreach(finishApplication)
}
- case RemoteClientDisconnected(transport, address) => {
+ case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
}
- case RemoteClientShutdown(transport, address) => {
+ case AssociationErrorEvent(_, _, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
@@ -376,11 +391,11 @@ private[spark] object Master {
actorSystem.awaitTermination()
}
- /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+ /** Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
- "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+ "akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
@@ -388,7 +403,7 @@ private[spark] object Master {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+ val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName)
val timeoutDuration = Duration.create(
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
implicit val timeout = Timeout(timeoutDuration)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a0a9d1040a..1f04c1eea5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,9 +25,9 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -113,7 +113,7 @@ private[spark] class Worker(
logInfo("Connecting to master " + masterUrl)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
@@ -165,7 +165,7 @@ private[spark] class Worker(
logInfo("Asked to kill unknown executor " + fullId)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
masterDisconnected()
case RequestWorkerState => {
@@ -207,8 +207,8 @@ private[spark] object Worker {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrl, workDir)), name = "Worker")
+ actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+ masterUrl, workDir), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ceae3b8289..99a4a95e82 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -105,7 +105,7 @@ private[spark] class Executor(
SparkEnv.set(env)
env.metricsSystem.registerSource(executorSource)
- private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
+ private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size")
// Start worker thread pool
val threadPool = new ThreadPoolExecutor(
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 7839023868..46f0ef2cc6 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -19,13 +19,25 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
-import akka.actor.{ActorRef, Actor, Props, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.actor._
+import akka.remote._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.DisassociatedEvent
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisteredExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.LaunchTask
+import akka.remote.AssociationErrorEvent
+import akka.remote.DisassociatedEvent
+import akka.actor.Terminated
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.StandaloneClusterMessages.RegisterExecutorFailed
private[spark] class StandaloneExecutorBackend(
@@ -40,14 +52,14 @@ private[spark] class StandaloneExecutorBackend(
Utils.checkHostPort(hostPort, "Expected hostport")
var executor: Executor = null
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+ // context.watch(driver) // Doesn't work with remote actors, but useful for testing
}
override def receive = {
@@ -69,7 +81,7 @@ private[spark] class StandaloneExecutorBackend(
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case _: Terminated | DisassociatedEvent | AssociationErrorEvent =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
}
@@ -90,8 +102,8 @@ private[spark] object StandaloneExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
System.setProperty("spark.hostPort", sparkHostPort)
- val actor = actorSystem.actorOf(
- Props(new StandaloneExecutorBackend(driverUrl, executorId, sparkHostPort, cores)),
+ actorSystem.actorOf(
+ Props(classOf[StandaloneExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0c..fa83ae19d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -42,7 +42,7 @@ private[spark] class SparkDeploySchedulerBackend(
super.start()
// The endpoint for executors to talk to us
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index addfa077c1..49f668eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
@@ -53,7 +53,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
// Periodically revive offers to allow delay scheduling to work
val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
@@ -101,11 +101,11 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
- case RemoteClientDisconnected(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disconnected"))
+ case DisassociatedEvent(_, remoteAddress, _) =>
+ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client disconnected"))
- case RemoteClientShutdown(transport, address) =>
- addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client shutdown"))
+ case AssociationErrorEvent(_, _, remoteAddress, _) =>
+ addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_, "remote Akka client shutdown"))
}
// Make fake resource offers on all executors
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 3dbe61d706..babe875fa1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -119,7 +119,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index e674d120ea..af1c36b34d 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -48,28 +48,27 @@ private[spark] object AkkaUtils {
val akkaConf = ConfigFactory.parseString("""
akka.daemonic = on
- akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+ akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
akka.stdout-loglevel = "ERROR"
akka.actor.provider = "akka.remote.RemoteActorRefProvider"
- akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
- akka.remote.netty.hostname = "%s"
- akka.remote.netty.port = %d
- akka.remote.netty.connection-timeout = %ds
- akka.remote.netty.message-frame-size = %d MiB
- akka.remote.netty.execution-pool-size = %d
+ akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
+ akka.remote.netty.tcp.hostname = "%s"
+ akka.remote.netty.tcp.port = %d
+ akka.remote.netty.tcp.connection-timeout = %ds
+ akka.remote.netty.tcp.message-frame-size = %d MiB
+ akka.remote.netty.tcp.execution-pool-size = %d
akka.actor.default-dispatcher.throughput = %d
akka.remote.log-remote-lifecycle-events = %s
- akka.remote.netty.write-timeout = %ds
- """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
- lifecycleEvents, akkaWriteTimeout))
+ """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
+ lifecycleEvents))
val actorSystem = ActorSystem(name, akkaConf)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
- val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
- return (actorSystem, boundPort)
+ val boundPort = provider.getDefaultAddress.port.get
+ (actorSystem, boundPort)
}
}
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 7a856d4081..c719a54a61 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -325,7 +325,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
// when running under LocalScheduler:
sc = new SparkContext("local-cluster[1,1,512]", "test")
val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
val exception = intercept[SparkException] {
rdd.reduce((x, y) => x)
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
index 01a72d8401..6d1695eae7 100644
--- a/core/src/test/scala/org/apache/spark/DriverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -34,7 +34,7 @@ class DriverSuite extends FunSuite with Timeouts {
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
- failAfter(30 seconds) {
+ failAfter(60 seconds) {
Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
new File(System.getenv("SPARK_HOME")))
}
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 6013320eaa..18fb1bf590 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -109,7 +109,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor(
- "akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
+ "akka.tcp://spark@localhost:" + boundPort + "/user/MapOutputTracker")
masterTracker.registerShuffle(10, 1)
masterTracker.incrementEpoch()