From c6f95e603e2c58b2fab6255c824a839807f03026 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 15 Dec 2013 20:30:21 -0800 Subject: Attempt with extra repositories --- README.md | 6 +- core/pom.xml | 9 +-- new-yarn/pom.xml | 6 +- .../apache/spark/deploy/yarn/WorkerLauncher.scala | 10 +-- .../spark/deploy/yarn/YarnAllocationHandler.scala | 2 +- pom.xml | 76 ++++++++++++---------- project/SparkBuild.scala | 32 +++------ 7 files changed, 65 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index 80bbe311a9..1550a8b551 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ versions without YARN, use: # Cloudera CDH 4.2.0 with MapReduce v1 $ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly -For Apache Hadoop 2.0.X, 2.1.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions +For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions with YARN, also set `SPARK_YARN=true`: # Apache Hadoop 2.0.5-alpha @@ -63,10 +63,8 @@ with YARN, also set `SPARK_YARN=true`: # Cloudera CDH 4.2.0 with MapReduce v2 $ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly -When building for Hadoop 2.2.X and newer, you'll need to include the additional `new-yarn` profile: - # Apache Hadoop 2.2.X and newer - $ mvn -Dyarn.version=2.2.0 -Dhadoop.version=2.2.0 -Pnew-yarn + $ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly When developing a Spark application, specify the Hadoop version by adding the "hadoop-client" artifact to your project's dependencies. For example, if you're diff --git a/core/pom.xml b/core/pom.xml index cdbaa52731..b83a2a8779 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -80,10 +80,6 @@ org.ow2.asm asm - - com.google.protobuf - protobuf-java - com.twitter chill_${scala.binary.version} @@ -94,10 +90,6 @@ chill-java 0.3.1 - - ${akka.group} - akka-actor_${scala.binary.version} - ${akka.group} akka-remote_${scala.binary.version} @@ -105,6 +97,7 @@ ${akka.group} akka-slf4j_${scala.binary.version} + org.spark-project.akka org.scala-lang diff --git a/new-yarn/pom.xml b/new-yarn/pom.xml index 8a065c6d7d..4cd28f34e3 100644 --- a/new-yarn/pom.xml +++ b/new-yarn/pom.xml @@ -25,7 +25,7 @@ org.apache.spark - spark-yarn_2.9.3 + spark-yarn_2.10 jar Spark Project YARN Support http://spark.incubator.apache.org/ @@ -33,7 +33,7 @@ org.apache.spark - spark-core_2.9.3 + spark-core_2.10 ${project.version} @@ -63,7 +63,7 @@ org.scalatest - scalatest_2.9.3 + scalatest_2.10 test diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index c38f33e212..11da1c4e73 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import akka.actor._ -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import akka.remote._ import akka.actor.Terminated import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.{Utils, AkkaUtils} @@ -59,12 +59,12 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorFor(driverUrl) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(driver) // Doesn't work with remote actors, but useful for testing + driver ! "hello" + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } override def receive = { - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case x: DisassociatedEvent => logInfo("Driver terminated or disconnected! Shutting down.") driverClosed = true } @@ -140,7 +140,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte System.setProperty("spark.driver.host", driverHost) System.setProperty("spark.driver.port", driverPort.toString) - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME) actor = actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index dba0f7640e..c27257cda4 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -253,7 +253,7 @@ private[yarn] class YarnAllocationHandler( numWorkersRunning.decrementAndGet() } else { val workerId = workerIdCounter.incrementAndGet().toString - val driverUrl = "akka://spark@%s:%s/user/%s".format( + val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) diff --git a/pom.xml b/pom.xml index fd99fabc15..39c8a8cc5e 100644 --- a/pom.xml +++ b/pom.xml @@ -104,12 +104,11 @@ 2.10.3 2.10 0.13.0 - 2.2.3 - com.typesafe.akka - 2.4.1 + 2.2.3-shaded-protobuf 1.7.2 1.2.17 1.0.4 + 2.4.1 0.23.7 0.94.6 @@ -200,6 +199,11 @@ asm 4.0 + com.google.protobuf protobuf-java @@ -216,6 +220,7 @@ 0.3.1 +<<<<<<< HEAD ${akka.group} akka-actor_${scala.binary.version} ${akka.version} @@ -249,8 +254,13 @@ +<<<<<<< HEAD ${akka.group} akka-zeromq_${scala.binary.version} +======= + org.spark-project.akka + akka-zeromq_2.10 +>>>>>>> Attempt with extra repositories ${akka.version} @@ -461,6 +471,7 @@ + org.apache.hadoop hadoop-yarn-client @@ -716,6 +727,7 @@ 2 0.23.7 + 2.5.0 @@ -743,39 +755,37 @@ - - - - - - - - - + + new-yarn + + 2 + 2.2.0 + 2.5.0 + - - - + + new-yarn + - - - - - - - - - - - - - + + + maven-root + Maven root repository + http://repo1.maven.org/maven2/ + + true + + + false + + + - - - - - + + + + + repl-bin diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 441dcc18fb..29f4a4b9ff 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -84,21 +84,10 @@ object SparkBuild extends Build { case Some(v) => v.toBoolean } - if (isNewHadoop && isYarnEnabled) { - println( """Yarn with Hadoop version 2.2.x is not yet expected to work. - Please set env SPARK_HADOOP_VERSION to appropriate version or set SPARK_YARN to false.""") - throw new Exception("Yarn with Hadoop version 2.2.x is not yet expected to work.") - } - - // Build against a protobuf-2.5 compatible Akka if Hadoop 2 is used. - // lazy val protobufVersion = if (isNewHadoop) "2.5.0" else "2.4.1" - // lazy val akkaVersion = if (isNewHadoop) "2.0.5-protobuf-2.5-java-1.5" else "2.0.5" - // lazy val akkaGroup = if (isNewHadoop) "org.spark-project" else "com.typesafe.akka" - // Conditionally include the yarn sub-project - //lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) + lazy val yarn = Project("yarn", file(if (isNewHadoop) "new-yarn" else "yarn"), settings = yarnSettings) dependsOn(core) - lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) + //lazy val yarn = Project("yarn", file("yarn"), settings = yarnSettings) dependsOn(core) lazy val maybeYarn = if (isYarnEnabled) Seq[ClasspathDependency](yarn) else Seq[ClasspathDependency]() lazy val maybeYarnRef = if (isYarnEnabled) Seq[ProjectReference](yarn) else Seq[ProjectReference]() @@ -235,9 +224,8 @@ object SparkBuild extends Build { "com.ning" % "compress-lzf" % "0.8.4", "org.xerial.snappy" % "snappy-java" % "1.0.5", "org.ow2.asm" % "asm" % "4.0", - "com.google.protobuf" % "protobuf-java" % "2.4.1", - "com.typesafe.akka" %% "akka-remote" % "2.2.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-slf4j" % "2.2.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-remote" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-slf4j" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty), "net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty), "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", @@ -312,16 +300,16 @@ object SparkBuild extends Build { ), libraryDependencies ++= Seq( - "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), - "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" + "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy), + "com.sksamuel.kafka" %% "kafka" % "0.8.0-beta1" exclude("com.sun.jdmk", "jmxtools") exclude("com.sun.jmx", "jmxri") exclude("net.sf.jopt-simple", "jopt-simple") excludeAll(excludeNetty), - "org.eclipse.paho" % "mqtt-client" % "0.4.0", - "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), - "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), - "com.typesafe.akka" %% "akka-zeromq" % "2.2.3" excludeAll(excludeNetty) + "org.eclipse.paho" % "mqtt-client" % "0.4.0", + "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty), + "org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty), + "org.spark-project.akka" %% "akka-zeromq" % "2.2.3-shaded-protobuf" excludeAll(excludeNetty) ) ) -- cgit v1.2.3 From ceb013f8b97051ee96c65a8da7489a2b251ef799 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 16 Dec 2013 12:38:01 -0800 Subject: Remove trailing slashes from repository specifications. The correct format is to not have a trailing slash. For me this caused non-deterministic failures due to issues fetching certain artifacts. The issue was that some of the maven caches would fail to fetch the artifact (due to the way that the artifact path was concatenated with the repository) and this short-circuited the download process in a silent way. Here is what the log output looked like: Downloading: http://repo.maven.apache.org/maven2/org/spark-project/akka/akka-remote_2.10/2.2.3-shaded-protobuf/akka-remote_2.10-2.2.3-shaded-protobuf.pom [WARNING] The POM for org.spark-project.akka:akka-remote_2.10:jar:2.2.3-shaded-protobuf is missing, no dependency information available This was pretty brutal to debug since there was no error message anywhere and the path *looks* correct as reported by the Maven log. --- pom.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 39c8a8cc5e..30ef928618 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ maven-repo Maven Repository - http://repo.maven.apache.org/maven2/ + http://repo.maven.apache.org/maven2 true @@ -131,7 +131,7 @@ jboss-repo JBoss Repository - http://repository.jboss.org/nexus/content/repositories/releases/ + http://repository.jboss.org/nexus/content/repositories/releases true @@ -142,7 +142,7 @@ mqtt-repo MQTT Repository - https://repo.eclipse.org/content/repositories/paho-releases/ + https://repo.eclipse.org/content/repositories/paho-releases true @@ -739,7 +739,7 @@ maven-root Maven root repository - http://repo1.maven.org/maven2/ + http://repo1.maven.org/maven2 true @@ -771,7 +771,7 @@ maven-root Maven root repository - http://repo1.maven.org/maven2/ + http://repo1.maven.org/maven2 true -- cgit v1.2.3 From 24f8220dc8a0671df7ee32b3eb193120ca316878 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 16 Dec 2013 13:04:07 -0800 Subject: Removing extra code in new yarn --- .../src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index 11da1c4e73..ac519fc99c 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -59,7 +59,6 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorFor(driverUrl) - driver ! "hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } -- cgit v1.2.3 From c1fec89895f03dbdbb6f445ea3cdcd2d050555c4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 16 Dec 2013 21:56:21 -0800 Subject: Cleanup --- core/pom.xml | 1 - pom.xml | 6 ------ 2 files changed, 7 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index b83a2a8779..043f6cf68d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -97,7 +97,6 @@ ${akka.group} akka-slf4j_${scala.binary.version} - org.spark-project.akka org.scala-lang diff --git a/pom.xml b/pom.xml index 30ef928618..21d9da84b0 100644 --- a/pom.xml +++ b/pom.xml @@ -220,7 +220,6 @@ 0.3.1 -<<<<<<< HEAD ${akka.group} akka-actor_${scala.binary.version} ${akka.version} @@ -254,13 +253,8 @@ -<<<<<<< HEAD ${akka.group} akka-zeromq_${scala.binary.version} -======= - org.spark-project.akka - akka-zeromq_2.10 ->>>>>>> Attempt with extra repositories ${akka.version} -- cgit v1.2.3 From c1c0f8099f4559ff3ff9982878da6cd36d198ba0 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 16 Dec 2013 22:01:27 -0800 Subject: Clean-up --- .../src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 2 +- pom.xml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index ac519fc99c..dca389d64e 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -58,7 +58,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte override def preStart() { logInfo("Listen to driver: " + driverUrl) - driver = context.actorFor(driverUrl) + driver = context.actorSelection(driverUrl) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } diff --git a/pom.xml b/pom.xml index 21d9da84b0..57e843596f 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ 2.10.3 2.10 0.13.0 + org.spark-project.akka 2.2.3-shaded-protobuf 1.7.2 1.2.17 -- cgit v1.2.3 From 10c0ffa1eb23c6190f460be224ea555573458980 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 16 Dec 2013 22:10:55 -0800 Subject: One other fix --- .../src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala index dca389d64e..bc31bb2eb0 100644 --- a/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala +++ b/new-yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala @@ -54,7 +54,7 @@ class WorkerLauncher(args: ApplicationMasterArguments, conf: Configuration) exte // This actor just working as a monitor to watch on Driver Actor. class MonitorActor(driverUrl: String) extends Actor { - var driver: ActorRef = null + var driver: ActorSelection = null override def preStart() { logInfo("Listen to driver: " + driverUrl) -- cgit v1.2.3