aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorJey Kottalam <jey@cs.berkeley.edu>2013-07-24 14:01:48 -0700
committerJey Kottalam <jey@cs.berkeley.edu>2013-08-15 16:50:37 -0700
commite2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e (patch)
treed8bca46dc6f9aa1d4a431c0000dd5d0c9c6b4e70 /yarn
parentbd0bab47c9602462628b1d3c90d5eb5d889f4596 (diff)
downloadspark-e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e.tar.gz
spark-e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e.tar.bz2
spark-e2d7656ca3d561f0a6fc8dd81ef46e4aa6ba608e.zip
re-enable YARN support
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/Client.scala7
-rw-r--r--yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala (renamed from yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala)26
3 files changed, 13 insertions, 24 deletions
diff --git a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
index 1b06169739..d69a969d42 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/ApplicationMaster.scala
@@ -130,11 +130,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
+ logInfo("Driver now available: " + driverHost + ":" + driverPort)
driverUp = true
} catch {
case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
+ logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
}
}
diff --git a/yarn/src/main/scala/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
index 8bcbfc2735..9d3860b863 100644
--- a/yarn/src/main/scala/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/Client.scala
@@ -165,7 +165,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
Client.populateHadoopClasspath(yarnConf, env)
- SparkHadoopUtil.setYarnMode(env)
+ env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" +
localResources("spark.jar").getResource().getFile().toString()
@@ -313,8 +313,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
object Client {
def main(argStrings: Array[String]) {
+ // Set an env variable indicating we are running in YARN mode.
+ // Note that anything with SPARK prefix gets propagated to all (remote) processes
+ System.setProperty("SPARK_YARN_MODE", "true")
+
val args = new ClientArguments(argStrings)
- SparkHadoopUtil.setYarnMode()
new Client(args).run
}
diff --git a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a812bcf867..77c4ee7f3f 100644
--- a/yarn/src/main/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/yarn/src/main/scala/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package spark.deploy
+package spark.deploy.yarn
+import spark.deploy.SparkHadoopUtil
import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
@@ -28,32 +29,17 @@ import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
*/
-object SparkHadoopUtil {
-
- val yarnConf = newConfiguration()
+class YarnSparkHadoopUtil extends SparkHadoopUtil {
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- def isYarnMode(): Boolean = {
- val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
- java.lang.Boolean.valueOf(yarnMode)
- }
-
- // Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
- def setYarnMode() {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- def setYarnMode(env: HashMap[String, String]) {
- env("SPARK_YARN_MODE") = "true"
- }
+ override def isYarnMode(): Boolean = { true }
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
// Always create a new config, dont reuse yarnConf.
- def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
+ override def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {
+ override def addCredentials(conf: JobConf) {
val jobCreds = conf.getCredentials();
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}