From 276c37a51c9a6188dbbe02754935540ace338dd1 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Sun, 22 Sep 2013 08:20:12 +0530 Subject: Akka 2.2 migration --- .../main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'yarn/src') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 6d6ef149cc..d222f412a0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -209,7 +209,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), StandaloneSchedulerBackend.ACTOR_NAME) -- cgit v1.2.3 From 4e70480038e9654426876e8e6b2fc356b7f0c8ca Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 6 Dec 2013 12:25:32 +0530 Subject: A left over akka -> akka.tcp changes --- .../scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'yarn/src') diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index e000531a26..e8fecec4a6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -36,7 +36,7 @@ private[spark] class SimrSchedulerBackend( override def start() { super.start() - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 421a83c87a..b67e068844 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -168,7 +168,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") -- cgit v1.2.3 From c1201f47e0d44e92da42adb23d27f60d9d494642 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Mon, 9 Dec 2013 12:55:19 +0530 Subject: fixed yarn build --- .../org/apache/spark/deploy/yarn/WorkerLauncher.scala | 15 ++++++--------- .../apache/spark/deploy/yarn/YarnAllocationHandler.scala | 4 ++-- 2 files changed, 8 insertions(+), 11 deletions(-) (limited to 'yarn/src') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index b67e068844..69038844bb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} -import akka.remote.RemoteClientShutdown +import akka.remote._ import akka.actor.Terminated -import akka.remote.RemoteClientDisconnected import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) driver ! "hello" - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => - logInfo("Driver terminated or disconnected! Shutting down.") + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index baa030b4a4..a6ce1b60a7 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.conf.Configuration import java.util.{Collections, Set => JSet} import java.lang.{Boolean => JBoolean} -object AllocationType extends Enumeration ("HOST", "RACK", "ANY") { +object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value } @@ -370,7 +370,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY) val containerRequests: ArrayBuffer[ResourceRequest] = - new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1) + new ArrayBuffer[ResourceRequest](hostContainerRequests.size + rackContainerRequests.size + 1) containerRequests ++= hostContainerRequests containerRequests ++= rackContainerRequests -- cgit v1.2.3 From 842eb55fb53e08daed34d161167854b8b093fbd5 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Thu, 12 Dec 2013 11:11:09 -0600 Subject: Fix the --name option for Spark on Yarn --- .../src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 1 + yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala | 1 + 2 files changed, 2 insertions(+) (limited to 'yarn/src') diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 9efb28a942..70be15d0a3 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -92,6 +92,7 @@ class ClientArguments(val args: Array[String]) { case ("--name") :: value :: tail => appName = value + args = tail case ("--addJars") :: value :: tail => addJars = value diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b9dbc3fb87..b3a7886d93 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -89,6 +89,7 @@ class ClientArguments(val args: Array[String]) { case ("--name") :: value :: tail => appName = value + args = tail case ("--addJars") :: value :: tail => addJars = value -- cgit v1.2.3