From e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e Mon Sep 17 00:00:00 2001 From: Jey Kottalam Date: Wed, 24 Jul 2013 14:01:48 -0700 Subject: re-enable YARN support --- .../main/scala/spark/deploy/SparkHadoopUtil.scala | 60 ---------------------- .../spark/deploy/yarn/ApplicationMaster.scala | 4 +- yarn/src/main/scala/spark/deploy/yarn/Client.scala | 7 ++- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 46 +++++++++++++++++ 4 files changed, 53 insertions(+), 64 deletions(-) delete mode 100644 yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala create mode 100644 yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala (limited to 'yarn/src') diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala deleted file mode 100644 index a812bcf867..0000000000 --- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark.deploy - -import collection.mutable.HashMap -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import java.security.PrivilegedExceptionAction - -/** - * Contains util methods to interact with Hadoop from spark. - */ -object SparkHadoopUtil { - - val yarnConf = newConfiguration() - - // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. - def isYarnMode(): Boolean = { - val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")) - java.lang.Boolean.valueOf(yarnMode) - } - - // Set an env variable indicating we are running in YARN mode. - // Note that anything with SPARK prefix gets propagated to all (remote) processes - def setYarnMode() { - System.setProperty("SPARK_YARN_MODE", "true") - } - - def setYarnMode(env: HashMap[String, String]) { - env("SPARK_YARN_MODE") = "true" - } - - // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems - // Always create a new config, dont reuse yarnConf. - def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) - - // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster - def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) - } -} diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala index 1b06169739..d69a969d42 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala @@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Master now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: " + driverHost + ":" + driverPort) driverUp = true } catch { case e: Exception => - logError("Failed to connect to driver at " + driverHost + ":" + driverPort) + logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") Thread.sleep(100) } } diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala index 8bcbfc2735..9d3860b863 100644 --- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala @@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) - SparkHadoopUtil.setYarnMode(env) + env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getFile().toString() @@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl object Client { def main(argStrings: Array[String]) { + // Set an env variable indicating we are running in YARN mode. + // Note that anything with SPARK prefix gets propagated to all (remote) processes + System.setProperty("SPARK_YARN_MODE", "true") + val args = new ClientArguments(argStrings) - SparkHadoopUtil.setYarnMode() new Client(args).run } diff --git a/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala new file mode 100644 index 0000000000..77c4ee7f3f --- /dev/null +++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package spark.deploy.yarn + +import spark.deploy.SparkHadoopUtil +import collection.mutable.HashMap +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import java.security.PrivilegedExceptionAction + +/** + * Contains util methods to interact with Hadoop from spark. + */ +class YarnSparkHadoopUtil extends SparkHadoopUtil { + + // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true. + override def isYarnMode(): Boolean = { true } + + // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems + // Always create a new config, dont reuse yarnConf. + override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + override def addCredentials(conf: JobConf) { + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } +} -- cgit v1.2.3