diff options
author | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-20 14:39:03 -0800 |
---|---|---|
committer | Kay Ousterhout <kayousterhout@gmail.com> | 2013-12-20 14:39:30 -0800 |
commit | c06945cfe0717006ae0d44da797a4fbb1a48954d (patch) | |
tree | 9eddb7d47b5829bdb2f117b5676ecc18b11328c7 /yarn | |
parent | 9228ec847e841a17c7dff7e75bc2e06bea799ea4 (diff) | |
parent | 0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff) | |
download | spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.gz spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.bz2 spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.zip |
Merge remote branch 'upstream/master' into consolidate_schedulers
Conflicts:
core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Diffstat (limited to 'yarn')
4 files changed, 15 insertions, 17 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml index 8a065c6d7d..bc64a190fd 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.9.3</artifactId> + <artifactId>spark-yarn_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> @@ -33,7 +33,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -63,7 +63,7 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> @@ -74,8 +74,8 @@ </dependencies> <build> - <outputDirectory>target/scala-${scala.version}/classes</outputDirectory> - <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index b9dbc3fb87..b3a7886d93 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -89,6 +89,7 @@ class ClientArguments(val args: Array[String]) { case ("--name") :: value :: tail => appName = value + args = tail case ("--addJars") :: value :: tail => addJars = value diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 421a83c87a..69038844bb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} -import akka.remote.RemoteClientShutdown +import akka.remote._ import akka.actor.Terminated -import akka.remote.RemoteClientDisconnected import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) driver ! "hello" - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => - logInfo("Driver terminated or disconnected! Shutting down.") + case x: DisassociatedEvent => + logInfo(s"Driver terminated or disconnected! Shutting down. $x") driverClosed = true } } @@ -168,7 +165,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index f15f3c7c11..9ab2073529 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResp import org.apache.hadoop.yarn.util.{RackResolver, Records} -object AllocationType extends Enumeration ("HOST", "RACK", "ANY") { +object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value } @@ -238,7 +238,7 @@ private[yarn] class YarnAllocationHandler( // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) |