aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md6
-rw-r--r--core/pom.xml9
-rw-r--r--new-yarn/pom.xml6
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala10
-rw-r--r--new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
-rw-r--r--pom.xml76
-rw-r--r--project/SparkBuild.scala32
7 files changed, 65 insertions, 76 deletions
diff --git a/README.md b/README.md
index 80bbe311a9..1550a8b551 100644
--- a/README.md
+++ b/README.md
@@ -54,7 +54,7 @@ versions without YARN, use:
# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly
-For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
+For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:
# Apache Hadoop 2.0.5-alpha
@@ -63,10 +63,8 @@ with YARN, also set `SPARK_YARN=true`:
# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly
-When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile:
-
# Apache Hadoop 2.2.X and newer
- $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn
+ $ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly
When developing a Spark application, specify the Hadoop version by adding the
"hadoop-client" artifact to your project's dependencies. For example, if you're
diff --git a/core/pom.xml b/core/pom.xml
index cdbaa52731..b83a2a8779 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -81,10 +81,6 @@
<artifactId>asm</artifactId>
</dependency>
<dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </dependency>
- <dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
@@ -96,15 +92,12 @@
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
- <artifactId>akka-actor_${scala.binary.version}</artifactId>
- </dependency>
- <dependency>
- <groupId>${akka.group}</groupId>
<artifactId>akka-remote_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>${akka.group}</groupId>
<artifactId>akka-slf4j_${scala.binary.version}</artifactId>
+ <groupId>org.spark-project.akka</groupId>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml
index 8a065c6d7d..4cd28f34e3 100644
--- a/new-yarn/pom.xml
+++ b/new-yarn/pom.xml
@@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn_2.9.3</artifactId>
+ <artifactId>spark-yarn_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
@@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.9.3</artifactId>
+ <artifactId>spark-core_2.10</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -63,7 +63,7 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.9.3</artifactId>
+ <artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
index c38f33e212..11da1c4e73 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import akka.actor._
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import akka.remote._
import akka.actor.Terminated
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -59,12 +59,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorFor(driverUrl)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(driver) // Doesn't work with remote actors, but useful for testing
+ driver ! "hello"
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
override def receive = {
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case x: DisassociatedEvent =>
logInfo("Driver terminated or disconnected! Shutting down.")
driverClosed = true
}
@@ -140,7 +140,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte
System.setProperty("spark.driver.host", driverHost)
System.setProperty("spark.driver.port", driverPort.toString)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index dba0f7640e..c27257cda4 100644
--- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -253,7 +253,7 @@ private[yarn] class YarnAllocationHandler(
numWorkersRunning.decrementAndGet()
} else {
val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),
System.getProperty("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
diff --git a/pom.xml b/pom.xml
index fd99fabc15..39c8a8cc5e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,12 +104,11 @@
<scala.version>2.10.3</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<mesos.version>0.13.0</mesos.version>
- <akka.version>2.2.3</akka.version>
- <akka.group>com.typesafe.akka</akka.group>
- <protobuf.version>2.4.1</protobuf.version>
+ <akka.version>2.2.3-shaded-protobuf</akka.version>
<slf4j.version>1.7.2</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<hadoop.version>1.0.4</hadoop.version>
+ <protobuf.version>2.4.1</protobuf.version>
<yarn.version>0.23.7</yarn.version>
<hbase.version>0.94.6</hbase.version>
@@ -200,6 +199,11 @@
<artifactId>asm</artifactId>
<version>4.0</version>
</dependency>
+ <!-- In theory we need not directly depend on protobuf since Spark does not directly
+ use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump
+ the protobuf version up from the one Mesos gives. For now we include this variable
+ to explicitly bump the version when building with YARN. It would be nice to figure
+ out why Maven can't resolve this correctly (like SBT does). -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
@@ -216,6 +220,7 @@
<version>0.3.1</version>
</dependency>
<dependency>
+<<<<<<< HEAD
<groupId>${akka.group}</groupId>
<artifactId>akka-actor_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
@@ -249,8 +254,13 @@
</exclusions>
</dependency>
<dependency>
+<<<<<<< HEAD
<groupId>${akka.group}</groupId>
<artifactId>akka-zeromq_${scala.binary.version}</artifactId>
+=======
+ <groupId>org.spark-project.akka</groupId>
+ <artifactId>akka-zeromq_2.10</artifactId>
+>>>>>>> Attempt with extra repositories
<version>${akka.version}</version>
<exclusions>
<exclusion>
@@ -461,6 +471,7 @@
</exclusion>
</exclusions>
</dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
@@ -716,6 +727,7 @@
<hadoop.major.version>2</hadoop.major.version>
<!-- 0.23.* is same as 2.0.* - except hardened to run production jobs -->
<hadoop.version>0.23.7</hadoop.version>
+ <protobuf.version>2.5.0</protobuf.version>
<!--<hadoop.version>2.0.5-alpha</hadoop.version> -->
</properties>
@@ -743,39 +755,37 @@
</dependencyManagement>
</profile>
- <!-- <profile> -->
- <!-- <id>new-yarn</id> -->
- <!-- <properties> -->
- <!-- <akka.group>org.spark-project</akka.group> -->
- <!-- <akka.version>2.0.5-protobuf-2.5-java-1.5</akka.version> -->
- <!-- <hadoop.major.version>2</hadoop.major.version> -->
- <!-- <hadoop.version>2.2.0</hadoop.version> -->
- <!-- <protobuf.version>2.5.0</protobuf.version> -->
- <!-- </properties> -->
+ <profile>
+ <id>new-yarn</id>
+ <properties>
+ <hadoop.major.version>2</hadoop.major.version>
+ <hadoop.version>2.2.0</hadoop.version>
+ <protobuf.version>2.5.0</protobuf.version>
+ </properties>
- <!-- <modules> -->
- <!-- <module>new-yarn</module> -->
- <!-- </modules> -->
+ <modules>
+ <module>new-yarn</module>
+ </modules>
- <!-- <repositories> -->
- <!-- <repository> -->
- <!-- <id>maven-root</id> -->
- <!-- <name>Maven root repository</name> -->
- <!-- <url>http://repo1.maven.org/maven2/</url> -->
- <!-- <releases> -->
- <!-- <enabled>true</enabled> -->
- <!-- </releases> -->
- <!-- <snapshots> -->
- <!-- <enabled>false</enabled> -->
- <!-- </snapshots> -->
- <!-- </repository> -->
- <!-- </repositories> -->
+ <repositories>
+ <repository>
+ <id>maven-root</id>
+ <name>Maven root repository</name>
+ <url>http://repo1.maven.org/maven2/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
- <!-- <dependencyManagement> -->
- <!-- <dependencies> -->
- <!-- </dependencies> -->
- <!-- </dependencyManagement> -->
- <!-- </profile> -->
+ <dependencyManagement>
+ <dependencies>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
<profile>
<id>repl-bin</id>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 441dcc18fb..29f4a4b9ff 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -84,21 +84,10 @@ object SparkBuild extends Build {
case Some(v) => v.toBoolean
}
- if (isNewHadoop && isYarnEnabled) {
- println( """Yarn with Hadoop version 2.2.x is not yet expected to work.
- Please set env SPARK_HADOOP_VERSION to appropriate version or set SPARK_YARN to false.""")
- throw new Exception("Yarn with Hadoop version 2.2.x is not yet expected to work.")
- }
-
- // Build against a protobuf-2.5 compatible Akka if Hadoop 2 is used.
- // lazy val protobufVersion = if (isNewHadoop) "2.5.0" else "2.4.1"
- // lazy val akkaVersion = if (isNewHadoop) "2.0.5-protobuf-2.5-java-1.5" else "2.0.5"
- // lazy val akkaGroup = if (isNewHadoop) "org.spark-project" else "com.typesafe.akka"
-
// Conditionally include the yarn sub-project
- //lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
+ lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core)
- lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
+ //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core)
lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]()
lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]()
@@ -235,9 +224,8 @@ object SparkBuild extends Build {
"com.ning" % "compress-lzf" % "0.8.4",
"org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
- "com.google.protobuf" % "protobuf-java" % "2.4.1",
- "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty),
"net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
@@ -312,16 +300,16 @@ object SparkBuild extends Build {
),
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
+ "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1"
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("net.sf.jopt-simple", "jopt-simple")
excludeAll(excludeNetty),
- "org.eclipse.paho" % "mqtt-client" % "0.4.0",
- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
- "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty)
+ "org.eclipse.paho" % "mqtt-client" % "0.4.0",
+ "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
+ "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
+ "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty)
)
)