aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorKay Ousterhout <kayousterhout@gmail.com>2013-12-20 14:39:03 -0800
committerKay Ousterhout <kayousterhout@gmail.com>2013-12-20 14:39:30 -0800
commitc06945cfe0717006ae0d44da797a4fbb1a48954d (patch)
tree9eddb7d47b5829bdb2f117b5676ecc18b11328c7 /yarn
parent9228ec847e841a17c7dff7e75bc2e06bea799ea4 (diff)
parent0bc57c576792ba800eca0ec196c92a4d29cb3953 (diff)
downloadspark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.gz
spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.tar.bz2
spark-c06945cfe0717006ae0d44da797a4fbb1a48954d.zip
Merge remote branch 'upstream/master' into consolidate_schedulers
Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
Diffstat (limited to 'yarn')
-rw-r--r--yarn/pom.xml10
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala1
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala4
4 files changed, 15 insertions, 17 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 8a065c6d7d..bc64a190fd 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.9.3</artifactId>
+ <artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
@@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
@@ -74,8 +74,8 @@
</dependencies>
<build>
- <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
- <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index b9dbc3fb87..b3a7886d93 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -89,6 +89,7 @@ class ClientArguments(val args: Array[String]) {
case ("--name") :: value :: tail =>
appName = value
+ args = tail
case ("--addJars") :: value :: tail =>
addJars = value
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index 421a83c87a..69038844bb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -27,10 +27,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
-import akka.remote.RemoteClientShutdown
+import akka.remote._
import akka.actor.Terminated
-import akka.remote.RemoteClientDisconnected
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -55,19 +53,18 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
// This actor just working as a monitor to watch on Driver Actor.
class MonitorActor(driverUrl: String) extends Actor {
- var driver: ActorRef = null
+ var driver: ActorSelection = null
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
- driver = context.actorFor(driverUrl)
+ driver = context.actorSelection(driverUrl)
driver ! "hello"
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
- logInfo("Driver terminated or disconnected! Shutting down.")
+ case x: DisassociatedEvent =>
+ logInfo(s"Driver terminated or disconnected! Shutting down. $x")
driverClosed = true
}
}
@@ -168,7 +165,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index f15f3c7c11..9ab2073529 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResp
import org.apache.hadoop.yarn.util.{RackResolver, Records}
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
+object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}
@@ -238,7 +238,7 @@ private[yarn] class YarnAllocationHandler(
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (workerIdCounter)
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)