aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-12-20 14:39:03 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-12-20 14:39:30 -0800
commitc06945cfe0717006ae0d44da797a4fbb1a48954d (patch)
tree9eddb7d47b5829bdb2f117b5676ecc18b11328c7 /yarn/src
parent9228ec847e841a17c7dff7e75bc2e06bea799ea4 (diff)
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.gz
spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.bz2
spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.zip
Merge remote branch 'upstream/master' into consolidate_schedulers
Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
3 files changed, 10 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b9dbc3fb87..b3a7886d93 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -89,6 +89,7 @@ class ClientArguments(val args: Array[String]) {
case ("--name") :: value :: tail =>
appName = value
+ args = tail
case ("--addJars") :: value :: tail =>
addJars = value
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 421a83c87a..69038844bb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
-import akka.remote.RemoteClientShutdown
+import akka.remote._
import akka.actor.Terminated
-import akka.remote.RemoteClientDisconnected
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! "hello"
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
- logInfo("Driver terminated or disconnected! Shutting down.")
+ case x: DisassociatedEvent =>
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
}
}
@@ -168,7 +165,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index f15f3c7c11..9ab2073529 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResp
import org.apache.hadoop.yarn.util.{RackResolver, Records}
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
+object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}
@@ -238,7 +238,7 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)