diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-12-16 22:42:21 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-12-16 22:42:21 -0800 |
commit | 7a8169be9a0b6b3d0d53a98aa38940d47b201296 (patch) | |
tree | b30e8f222e936f8a0ef930f7336b3b973385da58 | |
parent | 964a3b6971716823f473f672611951d1e489a552 (diff) | |
parent | 10c0ffa1eb23c6190f460be224ea555573458980 (diff) | |
download | spark-7a8169be9a0b6b3d0d53a98aa38940d47b201296.tar.gz spark-7a8169be9a0b6b3d0d53a98aa38940d47b201296.tar.bz2 spark-7a8169be9a0b6b3d0d53a98aa38940d47b201296.zip |
Merge pull request #268 from pwendell/shaded-protobuf
Add support for 2.2. to master (via shaded jars)
This patch does a few related things. NOTE: This may not compile correctly for ~24 hours until artifacts fully propagate to Maven Central.
1. Uses shaded versions of akka/protobuf. For more information on how these versions were prepared, see [1].
2. Brings the `new-yarn` project up-to-date with the changes for Akka 2.2.3.
3. Some clean-up of the build now that we don't have to switch akka groups for different YARN versions.
[1]
https://github.com/pwendell/spark-utils/tree/933a309ef85c22643e8e4b5e365652101c4e95de/shaded-protobuf
-rw-r--r-- | README.md | 6 | ||||
-rw-r--r-- | core/pom.xml | 8 | ||||
-rw-r--r-- | new-yarn/pom.xml | 6 | ||||
-rw-r--r-- | new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 13 | ||||
-rw-r--r-- | new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 2 | ||||
-rw-r--r-- | pom.xml | 79 | ||||
-rw-r--r-- | project/SparkBuild.scala | 32 |
7 files changed, 64 insertions, 82 deletions
@@ -54,7 +54,7 @@ versions without YARN, use: # Cloudera CDH 4.2.0 with MapReduce v1 $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly -For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions +For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, also set `SPARK_YARN=true`: # Apache Hadoop 2.0.5-alpha @@ -63,10 +63,8 @@ with YARN, also set `SPARK_YARN=true`: # Cloudera CDH 4.2.0 with MapReduce v2 $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly -When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile: - # Apache Hadoop 2.2.X and newer - $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn + $ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly When developing a Spark application, specify the Hadoop version by adding the "hadoop-client" artifact to your project's dependencies. For example, if you're diff --git a/core/pom.xml b/core/pom.xml index cdbaa52731..043f6cf68d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -81,10 +81,6 @@ <artifactId>asm</artifactId> </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - <dependency> <groupId>com.twitter</groupId> <artifactId>chill_${scala.binary.version}</artifactId> <version>0.3.1</version> @@ -96,10 +92,6 @@ </dependency> <dependency> <groupId>${akka.group}</groupId> - <artifactId>akka-actor_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>${akka.group}</groupId> <artifactId>akka-remote_${scala.binary.version}</artifactId> </dependency> <dependency> diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml index 8a065c6d7d..4cd28f34e3 100644 --- a/new-yarn/pom.xml +++ b/new-yarn/pom.xml @@ -25,7 +25,7 @@ </parent> <groupId>org.apache.spark</groupId> - <artifactId>spark-yarn_2.9.3</artifactId> + <artifactId>spark-yarn_2.10</artifactId> <packaging>jar</packaging> <name>Spark Project YARN Support</name> <url>http://spark.incubator.apache.org/</url> @@ -33,7 +33,7 @@ <dependencies> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.9.3</artifactId> + <artifactId>spark-core_2.10</artifactId> <version>${project.version}</version> </dependency> <dependency> @@ -63,7 +63,7 @@ </dependency> <dependency> <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.9.3</artifactId> + <artifactId>scalatest_2.10</artifactId> <scope>test</scope> </dependency> <dependency> diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c38f33e212..bc31bb2eb0 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import akka.remote._ import akka.actor.Terminated import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} @@ -54,17 +54,16 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + driver = context.actorSelection(driverUrl) + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case x: DisassociatedEvent => logInfo("Driver terminated or disconnected! Shutting down.") driverClosed = true } @@ -140,7 +139,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index dba0f7640e..c27257cda4 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -253,7 +253,7 @@ private[yarn] class YarnAllocationHandler( numWorkersRunning.decrementAndGet() } else { val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) @@ -104,12 +104,12 @@ <scala.version>2.10.3</scala.version> <scala.binary.version>2.10</scala.binary.version> <mesos.version>0.13.0</mesos.version> - <akka.version>2.2.3</akka.version> - <akka.group>com.typesafe.akka</akka.group> - <protobuf.version>2.4.1</protobuf.version> + <akka.group>org.spark-project.akka</akka.group> + <akka.version>2.2.3-shaded-protobuf</akka.version> <slf4j.version>1.7.2</slf4j.version> <log4j.version>1.2.17</log4j.version> <hadoop.version>1.0.4</hadoop.version> + <protobuf.version>2.4.1</protobuf.version> <yarn.version>0.23.7</yarn.version> <hbase.version>0.94.6</hbase.version> @@ -121,7 +121,7 @@ <repository> <id>maven-repo</id> <!-- This should be at top, it makes maven try the central repo first and then others and hence faster dep resolution --> <name>Maven Repository</name> - <url>http://repo.maven.apache.org/maven2/</url> + <url>http://repo.maven.apache.org/maven2</url> <releases> <enabled>true</enabled> </releases> @@ -132,7 +132,7 @@ <repository> <id>jboss-repo</id> <name>JBoss Repository</name> - <url>http://repository.jboss.org/nexus/content/repositories/releases/</url> + <url>http://repository.jboss.org/nexus/content/repositories/releases</url> <releases> <enabled>true</enabled> </releases> @@ -143,7 +143,7 @@ <repository> <id>mqtt-repo</id> <name>MQTT Repository</name> - <url>https://repo.eclipse.org/content/repositories/paho-releases/</url> + <url>https://repo.eclipse.org/content/repositories/paho-releases</url> <releases> <enabled>true</enabled> </releases> @@ -200,6 +200,11 @@ <artifactId>asm</artifactId> <version>4.0</version> </dependency> + <!-- In theory we need not directly depend on protobuf since Spark does not directly + use it. However, when building with Hadoop/YARN 2.2 Maven doesn't correctly bump + the protobuf version up from the one Mesos gives. For now we include this variable + to explicitly bump the version when building with YARN. It would be nice to figure + out why Maven can't resolve this correctly (like SBT does). --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> @@ -461,6 +466,7 @@ </exclusion> </exclusions> </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-client</artifactId> @@ -716,6 +722,7 @@ <hadoop.major.version>2</hadoop.major.version> <!-- 0.23.* is same as 2.0.* - except hardened to run production jobs --> <hadoop.version>0.23.7</hadoop.version> + <protobuf.version>2.5.0</protobuf.version> <!--<hadoop.version>2.0.5-alpha</hadoop.version> --> </properties> @@ -727,7 +734,7 @@ <repository> <id>maven-root</id> <name>Maven root repository</name> - <url>http://repo1.maven.org/maven2/</url> + <url>http://repo1.maven.org/maven2</url> <releases> <enabled>true</enabled> </releases> @@ -743,39 +750,37 @@ </dependencyManagement> </profile> - <!-- <profile> --> - <!-- <id>new-yarn</id> --> - <!-- <properties> --> - <!-- <akka.group>org.spark-project</akka.group> --> - <!-- <akka.version>2.0.5-protobuf-2.5-java-1.5</akka.version> --> - <!-- <hadoop.major.version>2</hadoop.major.version> --> - <!-- <hadoop.version>2.2.0</hadoop.version> --> - <!-- <protobuf.version>2.5.0</protobuf.version> --> - <!-- </properties> --> + <profile> + <id>new-yarn</id> + <properties> + <hadoop.major.version>2</hadoop.major.version> + <hadoop.version>2.2.0</hadoop.version> + <protobuf.version>2.5.0</protobuf.version> + </properties> - <!-- <modules> --> - <!-- <module>new-yarn</module> --> - <!-- </modules> --> + <modules> + <module>new-yarn</module> + </modules> - <!-- <repositories> --> - <!-- <repository> --> - <!-- <id>maven-root</id> --> - <!-- <name>Maven root repository</name> --> - <!-- <url>http://repo1.maven.org/maven2/</url> --> - <!-- <releases> --> - <!-- <enabled>true</enabled> --> - <!-- </releases> --> - <!-- <snapshots> --> - <!-- <enabled>false</enabled> --> - <!-- </snapshots> --> - <!-- </repository> --> - <!-- </repositories> --> + <repositories> + <repository> + <id>maven-root</id> + <name>Maven root repository</name> + <url>http://repo1.maven.org/maven2</url> + <releases> + <enabled>true</enabled> + </releases> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> - <!-- <dependencyManagement> --> - <!-- <dependencies> --> - <!-- </dependencies> --> - <!-- </dependencyManagement> --> - <!-- </profile> --> + <dependencyManagement> + <dependencies> + </dependencies> + </dependencyManagement> + </profile> <profile> <id>repl-bin</id> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 441dcc18fb..29f4a4b9ff 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -84,21 +84,10 @@ object SparkBuild extends Build { case Some(v) => v.toBoolean } - if (isNewHadoop && isYarnEnabled) { - println( """Yarn with Hadoop version 2.2.x is not yet expected to work. - Please set env SPARK_HADOOP_VERSION to appropriate version or set SPARK_YARN to false.""") - throw new Exception("Yarn with Hadoop version 2.2.x is not yet expected to work.") - } - - // Build against a protobuf-2.5 compatible Akka if Hadoop 2 is used. - // lazy val protobufVersion = if (isNewHadoop) "2.5.0" else "2.4.1" - // lazy val akkaVersion = if (isNewHadoop) "2.0.5-protobuf-2.5-java-1.5" else "2.0.5" - // lazy val akkaGroup = if (isNewHadoop) "org.spark-project" else "com.typesafe.akka" - // Conditionally include the yarn sub-project - //lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) + lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) - lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) + //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() @@ -235,9 +224,8 @@ object SparkBuild extends Build { "com.ning" % "compress-lzf" % "0.8.4", "org.xerial.snappy" % "snappy-java" % "1.0.5", "org.ow2.asm" % "asm" % "4.0", - "com.google.protobuf" % "protobuf-java" % "2.4.1", - "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", @@ -312,16 +300,16 @@ object SparkBuild extends Build { ), libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), + "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") exclude("net.sf.jopt-simple", "jopt-simple") excludeAll(excludeNetty), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty) + "org.eclipse.paho" % "mqtt-client" % "0.4.0", + "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) ) ) |