aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-20 10:14:15 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-20 10:14:15 -0700
commitaa2b89d98d6d195a38e36c1947d437ab7346e5c9 (patch)
tree9638d8825a43f9e8b69dcf91a653d76d6e319ec0 /core
parentd61337f6403da454b9fb6cf090fe3ab61892a11b (diff)
parent6f6944c8079bffdd088ddb0a84fbf83356e294ea (diff)
downloadspark-aa2b89d98d6d195a38e36c1947d437ab7346e5c9.tar.gz
spark-aa2b89d98d6d195a38e36c1947d437ab7346e5c9.tar.bz2
spark-aa2b89d98d6d195a38e36c1947d437ab7346e5c9.zip
Merge remote-tracking branch 'jey/hadoop-agnostic'
Conflicts: core/src/main/scala/spark/PairRDDFunctions.scala
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml192
-rw-r--r--core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala27
-rw-r--r--core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala29
-rw-r--r--core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala47
-rw-r--r--core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala30
-rw-r--r--core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala30
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala76
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala351
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala94
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala327
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala116
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala217
-rw-r--r--core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala564
-rw-r--r--core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala52
-rw-r--r--core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala27
-rw-r--r--core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala30
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala45
-rw-r--r--core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala69
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala12
-rw-r--r--core/src/main/scala/spark/SparkContext.scala11
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala14
-rw-r--r--core/src/main/scala/spark/SparkHadoopWriter.scala (renamed from core/src/main/scala/spark/HadoopWriter.scala)6
-rw-r--r--core/src/main/scala/spark/Utils.scala7
-rw-r--r--core/src/main/scala/spark/deploy/SparkHadoopUtil.scala (renamed from core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala)13
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala16
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala14
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/InputFormatInfo.scala9
29 files changed, 178 insertions, 2255 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 73426a9ec5..6627a87de1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -33,6 +33,18 @@
<dependencies>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -126,7 +138,6 @@
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-json</artifactId>
</dependency>
-
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
@@ -204,183 +215,4 @@
</plugin>
</plugins>
</build>
-
- <profiles>
- <profile>
- <id>hadoop1</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- <source>src/hadoop1/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-scala-test-sources</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop1</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- <source>src/hadoop2/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-scala-test-sources</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- <profile>
- <id>hadoop2-yarn</id>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>add-source</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>add-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/main/scala</source>
- <source>src/hadoop2-yarn/scala</source>
- </sources>
- </configuration>
- </execution>
- <execution>
- <id>add-scala-test-sources</id>
- <phase>generate-test-sources</phase>
- <goals>
- <goal>add-test-source</goal>
- </goals>
- <configuration>
- <sources>
- <source>src/test/scala</source>
- </sources>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <classifier>hadoop2-yarn</classifier>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </profile>
- </profiles>
</project>
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 25386b2796..0000000000
--- a/core/src/hadoop1/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContext(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index b1002e0cac..0000000000
--- a/core/src/hadoop1/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContext(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContext(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
deleted file mode 100644
index 617954cb98..0000000000
--- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-object SparkHadoopUtil {
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to -D ...
- System.getProperty("user.name")
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
-
- // Add support, if exists - for now, simply run func !
- func(args)
- }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- def newConfiguration(): Configuration = new Configuration()
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {}
-
- def isYarnMode(): Boolean = { false }
-
-}
diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 0f972b7a0b..0000000000
--- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-import org.apache.hadoop.mapreduce.TaskType
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
- new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
-}
diff --git a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index 1a7cdf4788..0000000000
--- a/core/src/hadoop2-yarn/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-import task.{TaskAttemptContextImpl, JobContextImpl}
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) =
- new TaskAttemptID(jtIdentifier, jobId, if (isMap) TaskType.MAP else TaskType.REDUCE, taskId, attemptId)
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
deleted file mode 100644
index 6122fdced0..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy
-
-import collection.mutable.HashMap
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import java.security.PrivilegedExceptionAction
-
-/**
- * Contains util methods to interact with Hadoop from spark.
- */
-object SparkHadoopUtil {
-
- val yarnConf = newConfiguration()
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to env if -D is not present ...
- val retval = System.getProperty(Environment.USER.name, System.getenv(Environment.USER.name))
-
- // If nothing found, default to user we are running as
- if (retval == null) System.getProperty("user.name") else retval
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
- runAsUser(func, args, getUserNameFromEnvironment())
- }
-
- def runAsUser(func: (Product) => Unit, args: Product, user: String) {
- func(args)
- }
-
- // Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
- def isYarnMode(): Boolean = {
- val yarnMode = System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))
- java.lang.Boolean.valueOf(yarnMode)
- }
-
- // Set an env variable indicating we are running in YARN mode.
- // Note that anything with SPARK prefix gets propagated to all (remote) processes
- def setYarnMode() {
- System.setProperty("SPARK_YARN_MODE", "true")
- }
-
- def setYarnMode(env: HashMap[String, String]) {
- env("SPARK_YARN_MODE") = "true"
- }
-
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
- // Always create a new config, dont reuse yarnConf.
- def newConfiguration(): Configuration = new YarnConfiguration(new Configuration())
-
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
- def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials();
- jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
- }
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
deleted file mode 100644
index 1b06169739..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import java.net.Socket
-import java.util.concurrent.CopyOnWriteArrayList
-import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import scala.collection.JavaConversions._
-import spark.{SparkContext, Logging, Utils}
-import org.apache.hadoop.security.UserGroupInformation
-import java.security.PrivilegedExceptionAction
-
-class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
-
- def this(args: ApplicationMasterArguments) = this(args, new Configuration())
-
- private var rpc: YarnRPC = YarnRPC.create(conf)
- private var resourceManager: AMRMProtocol = null
- private var appAttemptId: ApplicationAttemptId = null
- private var userThread: Thread = null
- private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- private var yarnAllocator: YarnAllocationHandler = null
- private var isFinished:Boolean = false
-
- def run() {
-
- appAttemptId = getApplicationAttemptId()
- resourceManager = registerWithResourceManager()
- val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()
-
- // Compute number of threads for akka
- val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
-
- if (minimumMemory > 0) {
- val mem = args.workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- val numCore = (mem / minimumMemory) + (if (0 != (mem % minimumMemory)) 1 else 0)
-
- if (numCore > 0) {
- // do not override - hits https://issues.apache.org/jira/browse/HADOOP-8406
- // TODO: Uncomment when hadoop is on a version which has this fixed.
- // args.workerCores = numCore
- }
- }
-
- // Workaround until hadoop moves to something which has
- // https://issues.apache.org/jira/browse/HADOOP-8406
- // ignore result
- // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
- // Hence args.workerCores = numCore disabled above. Any better option ?
- // org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(conf)
-
- ApplicationMaster.register(this)
- // Start the user's JAR
- userThread = startUserClass()
-
- // This a bit hacky, but we need to wait until the spark.driver.port property has
- // been set by the Thread executing the user class.
- waitForSparkMaster()
-
- // Allocate all containers
- allocateWorkers()
-
- // Wait for the user class to Finish
- userThread.join()
-
- System.exit(0)
- }
-
- private def getApplicationAttemptId(): ApplicationAttemptId = {
- val envs = System.getenv()
- val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- logInfo("ApplicationAttemptId: " + appAttemptId)
- return appAttemptId
- }
-
- private def registerWithResourceManager(): AMRMProtocol = {
- val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
- logInfo("Connecting to ResourceManager at " + rmAddress)
- return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
- }
-
- private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
- logInfo("Registering the ApplicationMaster")
- val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
- .asInstanceOf[RegisterApplicationMasterRequest]
- appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
- // Users can then monitor stderr/stdout on that node if required.
- appMasterRequest.setHost(Utils.localHostName())
- appMasterRequest.setRpcPort(0)
- // What do we provide here ? Might make sense to expose something sensible later ?
- appMasterRequest.setTrackingUrl("")
- return resourceManager.registerApplicationMaster(appMasterRequest)
- }
-
- private def waitForSparkMaster() {
- logInfo("Waiting for spark driver to be reachable.")
- var driverUp = false
- while(!driverUp) {
- val driverHost = System.getProperty("spark.driver.host")
- val driverPort = System.getProperty("spark.driver.port")
- try {
- val socket = new Socket(driverHost, driverPort.toInt)
- socket.close()
- logInfo("Master now available: " + driverHost + ":" + driverPort)
- driverUp = true
- } catch {
- case e: Exception =>
- logError("Failed to connect to driver at " + driverHost + ":" + driverPort)
- Thread.sleep(100)
- }
- }
- }
-
- private def startUserClass(): Thread = {
- logInfo("Starting the user JAR in a separate Thread")
- val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
- .getMethod("main", classOf[Array[String]])
- val t = new Thread {
- override def run() {
- var successed = false
- try {
- // Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
- mainMethod.invoke(null, mainArgs)
- // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
- // userThread will stop here unless it has uncaught exception thrown out
- // It need shutdown hook to set SUCCEEDED
- successed = true
- } finally {
- if (successed) {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- } else {
- ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.FAILED)
- }
- }
- }
- }
- t.start()
- return t
- }
-
- private def allocateWorkers() {
- logInfo("Waiting for spark context initialization")
-
- try {
- var sparkContext: SparkContext = null
- ApplicationMaster.sparkContextRef.synchronized {
- var count = 0
- while (ApplicationMaster.sparkContextRef.get() == null) {
- logInfo("Waiting for spark context initialization ... " + count)
- count = count + 1
- ApplicationMaster.sparkContextRef.wait(10000L)
- }
- sparkContext = ApplicationMaster.sparkContextRef.get()
- assert(sparkContext != null)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, sparkContext.preferredNodeLocationData)
- }
-
-
- logInfo("Allocating " + args.numWorkers + " workers.")
- // Wait until all containers have finished
- // TODO: This is a bit ugly. Can we make it nicer?
- // TODO: Handle container failure
- while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
- // If user thread exists, then quit !
- userThread.isAlive) {
-
- this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
- ApplicationMaster.incrementAllocatorLoop(1)
- Thread.sleep(100)
- }
- } finally {
- // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
- // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
- ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
- }
- logInfo("All workers have launched.")
-
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- if (userThread.isAlive) {
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
-
- val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
- launchReporterThread(interval)
- }
- }
-
- // TODO: We might want to extend this to allocate more containers in case they die !
- private def launchReporterThread(_sleepTime: Long): Thread = {
- val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
-
- val t = new Thread {
- override def run() {
- while (userThread.isAlive) {
- val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
- if (missingWorkerCount > 0) {
- logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
- yarnAllocator.allocateContainers(missingWorkerCount)
- }
- else sendProgress()
- Thread.sleep(sleepTime)
- }
- }
- }
- // setting to daemon status, though this is usually not a good idea.
- t.setDaemon(true)
- t.start()
- logInfo("Started progress reporter thread - sleep time : " + sleepTime)
- return t
- }
-
- private def sendProgress() {
- logDebug("Sending progress")
- // simulated with an allocate request with no nodes requested ...
- yarnAllocator.allocateContainers(0)
- }
-
- /*
- def printContainers(containers: List[Container]) = {
- for (container <- containers) {
- logInfo("Launching shell command on a new container."
- + ", containerId=" + container.getId()
- + ", containerNode=" + container.getNodeId().getHost()
- + ":" + container.getNodeId().getPort()
- + ", containerNodeURI=" + container.getNodeHttpAddress()
- + ", containerState" + container.getState()
- + ", containerResourceMemory"
- + container.getResource().getMemory())
- }
- }
- */
-
- def finishApplicationMaster(status: FinalApplicationStatus) {
-
- synchronized {
- if (isFinished) {
- return
- }
- isFinished = true
- }
-
- logInfo("finishApplicationMaster with " + status)
- val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
- .asInstanceOf[FinishApplicationMasterRequest]
- finishReq.setAppAttemptId(appAttemptId)
- finishReq.setFinishApplicationStatus(status)
- resourceManager.finishApplicationMaster(finishReq)
-
- }
-
-}
-
-object ApplicationMaster {
- // number of times to wait for the allocator loop to complete.
- // each loop iteration waits for 100ms, so maximum of 3 seconds.
- // This is to ensure that we have reasonable number of containers before we start
- // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more
- // containers are available. Might need to handle this better.
- private val ALLOCATOR_LOOP_WAIT_COUNT = 30
- def incrementAllocatorLoop(by: Int) {
- val count = yarnAllocatorLoop.getAndAdd(by)
- if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.synchronized {
- // to wake threads off wait ...
- yarnAllocatorLoop.notifyAll()
- }
- }
- }
-
- private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
-
- def register(master: ApplicationMaster) {
- applicationMasters.add(master)
- }
-
- val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
- val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
-
- def sparkContextInitialized(sc: SparkContext): Boolean = {
- var modified = false
- sparkContextRef.synchronized {
- modified = sparkContextRef.compareAndSet(null, sc)
- sparkContextRef.notifyAll()
- }
-
- // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
- // Should not really have to do this, but it helps yarn to evict resources earlier.
- // not to mention, prevent Client declaring failure even though we exit'ed properly.
- if (modified) {
- Runtime.getRuntime().addShutdownHook(new Thread with Logging {
- // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
- logInfo("Adding shutdown hook for context " + sc)
- override def run() {
- logInfo("Invoking sc stop from shutdown hook")
- sc.stop()
- // best case ...
- for (master <- applicationMasters) {
- master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
- }
- }
- } )
- }
-
- // Wait for initialization to complete and atleast 'some' nodes can get allocated
- yarnAllocatorLoop.synchronized {
- while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
- yarnAllocatorLoop.wait(1000L)
- }
- }
- modified
- }
-
- def main(argStrings: Array[String]) {
- val args = new ApplicationMasterArguments(argStrings)
- new ApplicationMaster(args).run()
- }
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
deleted file mode 100644
index 8de44b1f66..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import spark.util.IntParam
-import collection.mutable.ArrayBuffer
-
-class ApplicationMasterArguments(val args: Array[String]) {
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer = new ArrayBuffer[String]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-memory") :: IntParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- }
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: spark.deploy.yarn.ApplicationMaster [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n")
- System.exit(exitCode)
- }
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
deleted file mode 100644
index 8bcbfc2735..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import java.net.{InetSocketAddress, URI}
-import java.nio.ByteBuffer
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.client.YarnClientImpl
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
-import spark.{Logging, Utils}
-import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import spark.deploy.SparkHadoopUtil
-
-class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
-
- def this(args: ClientArguments) = this(new Configuration(), args)
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
- val credentials = UserGroupInformation.getCurrentUser().getCredentials();
-
- def run() {
- init(yarnConf)
- start()
- logClusterResourceDetails()
-
- val newApp = super.getNewApplication()
- val appId = newApp.getApplicationId()
-
- verifyClusterResources(newApp)
- val appContext = createApplicationSubmissionContext(appId)
- val localResources = prepareLocalResources(appId, "spark")
- val env = setupLaunchEnv(localResources)
- val amContainer = createContainerLaunchContext(newApp, localResources, env)
-
- appContext.setQueue(args.amQueue)
- appContext.setAMContainerSpec(amContainer)
- appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
- submitApp(appContext)
-
- monitorApplication(appId)
- System.exit(0)
- }
-
-
- def logClusterResourceDetails() {
- val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
- logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers)
-
- val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
- logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity +
- ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
- ", queueChildQueueCount=" + queueInfo.getChildQueues.size)
- }
-
-
- def verifyClusterResources(app: GetNewApplicationResponse) = {
- val maxMem = app.getMaximumResourceCapability().getMemory()
- logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
-
- // if we have requested more then the clusters max for a single resource then exit.
- if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
- System.exit(1)
- }
- val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
- if (amMem > maxMem) {
- logError("AM size is to large to run on this cluster " + amMem)
- System.exit(1)
- }
-
- // We could add checks to make sure the entire cluster has enough resources but that involves getting
- // all the node reports and computing ourselves
- }
-
- def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = {
- logInfo("Setting up application submission context for ASM")
- val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
- appContext.setApplicationId(appId)
- appContext.setApplicationName("Spark")
- return appContext
- }
-
- def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val locaResources = HashMap[String, LocalResource]()
- // Upload Spark and the application JAR to the remote file system
- // Add them as local resources to the AM
- val fs = FileSystem.get(conf)
-
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
- if (UserGroupInformation.isSecurityEnabled()) {
- if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
- logError("Can't get Master Kerberos principal for use as renewer")
- System.exit(1)
- }
- }
-
- Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF"))
- .foreach { case(destName, _localPath) =>
- val localPath: String = if (_localPath != null) _localPath.trim() else ""
- if (! localPath.isEmpty()) {
- val src = new Path(localPath)
- val pathSuffix = appName + "/" + appId.getId() + destName
- val dst = new Path(fs.getHomeDirectory(), pathSuffix)
- logInfo("Uploading " + src + " to " + dst)
- fs.copyFromLocalFile(false, true, src, dst)
- val destStatus = fs.getFileStatus(dst)
-
- // get tokens for anything we upload to hdfs
- if (UserGroupInformation.isSecurityEnabled()) {
- fs.addDelegationTokens(delegTokenRenewer, credentials);
- }
-
- val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- amJarRsrc.setType(LocalResourceType.FILE)
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION)
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst))
- amJarRsrc.setTimestamp(destStatus.getModificationTime())
- amJarRsrc.setSize(destStatus.getLen())
- locaResources(destName) = amJarRsrc
- }
- }
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
- return locaResources
- }
-
- def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = {
- logInfo("Setting up the launch environment")
- val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null)
-
- val env = new HashMap[String, String]()
-
- // If log4j present, ensure ours overrides all others
- if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
- Client.populateHadoopClasspath(yarnConf, env)
- SparkHadoopUtil.setYarnMode(env)
- env("SPARK_YARN_JAR_PATH") =
- localResources("spark.jar").getResource().getScheme.toString() + "://" +
- localResources("spark.jar").getResource().getFile().toString()
- env("SPARK_YARN_JAR_TIMESTAMP") = localResources("spark.jar").getTimestamp().toString()
- env("SPARK_YARN_JAR_SIZE") = localResources("spark.jar").getSize().toString()
-
- env("SPARK_YARN_USERJAR_PATH") =
- localResources("app.jar").getResource().getScheme.toString() + "://" +
- localResources("app.jar").getResource().getFile().toString()
- env("SPARK_YARN_USERJAR_TIMESTAMP") = localResources("app.jar").getTimestamp().toString()
- env("SPARK_YARN_USERJAR_SIZE") = localResources("app.jar").getSize().toString()
-
- if (log4jConfLocalRes != null) {
- env("SPARK_YARN_LOG4J_PATH") =
- log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString()
- env("SPARK_YARN_LOG4J_TIMESTAMP") = log4jConfLocalRes.getTimestamp().toString()
- env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString()
- }
-
-
- // Add each SPARK-* key to the environment
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- return env
- }
-
- def userArgsToString(clientArgs: ClientArguments): String = {
- val prefix = " --args "
- val args = clientArgs.userArgs
- val retval = new StringBuilder()
- for (arg <- args){
- retval.append(prefix).append(" '").append(arg).append("' ")
- }
-
- retval.toString
- }
-
- def createContainerLaunchContext(newApp: GetNewApplicationResponse,
- localResources: HashMap[String, LocalResource],
- env: HashMap[String, String]): ContainerLaunchContext = {
- logInfo("Setting up container launch context")
- val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
- amContainer.setLocalResources(localResources)
- amContainer.setEnvironment(env)
-
- val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory()
-
- var amMemory = ((args.amMemory / minResMemory) * minResMemory) +
- (if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD
-
- // Extra options for the JVM
- var JAVA_OPTS = ""
-
- // Add Xmx for am memory
- JAVA_OPTS += "-Xmx" + amMemory + "m "
-
- // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
- // node, spark gc effects all other containers performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
- // limited to subset of cores on a node.
- if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) {
- // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
- }
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
- }
-
- // Command for the ApplicationMaster
- var javaCommand = "java";
- val javaHome = System.getenv("JAVA_HOME")
- if (javaHome != null && !javaHome.isEmpty()) {
- javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
- }
-
- val commands = List[String](javaCommand +
- " -server " +
- JAVA_OPTS +
- " spark.deploy.yarn.ApplicationMaster" +
- " --class " + args.userClass +
- " --jar " + args.userJar +
- userArgsToString(args) +
- " --worker-memory " + args.workerMemory +
- " --worker-cores " + args.workerCores +
- " --num-workers " + args.numWorkers +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- logInfo("Command for the ApplicationMaster: " + commands(0))
- amContainer.setCommands(commands)
-
- val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource]
- // Memory for the ApplicationMaster
- capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- amContainer.setResource(capability)
-
- // Setup security tokens
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
- return amContainer
- }
-
- def submitApp(appContext: ApplicationSubmissionContext) = {
- // Submit the application to the applications manager
- logInfo("Submitting application to ASM")
- super.submitApplication(appContext)
- }
-
- def monitorApplication(appId: ApplicationId): Boolean = {
- while(true) {
- Thread.sleep(1000)
- val report = super.getApplicationReport(appId)
-
- logInfo("Application report from ASM: \n" +
- "\t application identifier: " + appId.toString() + "\n" +
- "\t appId: " + appId.getId() + "\n" +
- "\t clientToken: " + report.getClientToken() + "\n" +
- "\t appDiagnostics: " + report.getDiagnostics() + "\n" +
- "\t appMasterHost: " + report.getHost() + "\n" +
- "\t appQueue: " + report.getQueue() + "\n" +
- "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
- "\t appStartTime: " + report.getStartTime() + "\n" +
- "\t yarnAppState: " + report.getYarnApplicationState() + "\n" +
- "\t distributedFinalState: " + report.getFinalApplicationStatus() + "\n" +
- "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" +
- "\t appUser: " + report.getUser()
- )
-
- val state = report.getYarnApplicationState()
- val dsStatus = report.getFinalApplicationStatus()
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.FAILED ||
- state == YarnApplicationState.KILLED) {
- return true
- }
- }
- return true
- }
-}
-
-object Client {
- def main(argStrings: Array[String]) {
- val args = new ClientArguments(argStrings)
- SparkHadoopUtil.setYarnMode()
- new Client(args).run
- }
-
- // Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
- def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) {
- for (c <- conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
- }
- }
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
deleted file mode 100644
index 67aff03781..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import spark.util.MemoryParam
-import spark.util.IntParam
-import collection.mutable.{ArrayBuffer, HashMap}
-import spark.scheduler.{InputFormatInfo, SplitInfo}
-
-// TODO: Add code and support for ensuring that yarn resource 'asks' are location aware !
-class ClientArguments(val args: Array[String]) {
- var userJar: String = null
- var userClass: String = null
- var userArgs: Seq[String] = Seq[String]()
- var workerMemory = 1024
- var workerCores = 1
- var numWorkers = 2
- var amQueue = System.getProperty("QUEUE", "default")
- var amMemory: Int = 512
- // TODO
- var inputFormatInfo: List[InputFormatInfo] = null
-
- parseArgs(args.toList)
-
- private def parseArgs(inputArgs: List[String]): Unit = {
- val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]()
- val inputFormatMap: HashMap[String, InputFormatInfo] = new HashMap[String, InputFormatInfo]()
-
- var args = inputArgs
-
- while (! args.isEmpty) {
-
- args match {
- case ("--jar") :: value :: tail =>
- userJar = value
- args = tail
-
- case ("--class") :: value :: tail =>
- userClass = value
- args = tail
-
- case ("--args") :: value :: tail =>
- userArgsBuffer += value
- args = tail
-
- case ("--master-memory") :: MemoryParam(value) :: tail =>
- amMemory = value
- args = tail
-
- case ("--num-workers") :: IntParam(value) :: tail =>
- numWorkers = value
- args = tail
-
- case ("--worker-memory") :: MemoryParam(value) :: tail =>
- workerMemory = value
- args = tail
-
- case ("--worker-cores") :: IntParam(value) :: tail =>
- workerCores = value
- args = tail
-
- case ("--queue") :: value :: tail =>
- amQueue = value
- args = tail
-
- case Nil =>
- if (userJar == null || userClass == null) {
- printUsageAndExit(1)
- }
-
- case _ =>
- printUsageAndExit(1, args)
- }
- }
-
- userArgs = userArgsBuffer.readOnly
- inputFormatInfo = inputFormatMap.values.toList
- }
-
-
- def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
- if (unknownParam != null) {
- System.err.println("Unknown/unsupported param " + unknownParam)
- }
- System.err.println(
- "Usage: spark.deploy.yarn.Client [options] \n" +
- "Options:\n" +
- " --jar JAR_PATH Path to your application's JAR file (required)\n" +
- " --class CLASS_NAME Name of your application's main class (required)\n" +
- " --args ARGS Arguments to be passed to your application's main class.\n" +
- " Mutliple invocations are possible, each will be passed in order.\n" +
- " --num-workers NUM Number of workers to start (Default: 2)\n" +
- " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
- " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
- " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
- " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
- )
- System.exit(exitCode)
- }
-
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
deleted file mode 100644
index f458f2f6a1..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import java.net.URI
-import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.net.NetUtils
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
-
-import spark.{Logging, Utils}
-
-class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
- slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
- extends Runnable with Logging {
-
- var rpc: YarnRPC = YarnRPC.create(conf)
- var cm: ContainerManager = null
- val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
-
- def run = {
- logInfo("Starting Worker Container")
- cm = connectToCM
- startContainer
- }
-
- def startContainer = {
- logInfo("Setting up ContainerLaunchContext")
-
- val ctx = Records.newRecord(classOf[ContainerLaunchContext])
- .asInstanceOf[ContainerLaunchContext]
-
- ctx.setContainerId(container.getId())
- ctx.setResource(container.getResource())
- val localResources = prepareLocalResources
- ctx.setLocalResources(localResources)
-
- val env = prepareEnvironment
- ctx.setEnvironment(env)
-
- // Extra options for the JVM
- var JAVA_OPTS = ""
- // Set the JVM memory
- val workerMemoryString = workerMemory + "m"
- JAVA_OPTS += "-Xms" + workerMemoryString + " -Xmx" + workerMemoryString + " "
- if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
- JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
- }
- // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
- // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
- // node, spark gc effects all other containers performance (which can also be other spark containers)
- // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is
- // limited to subset of cores on a node.
-/*
- else {
- // If no java_opts specified, default to using -XX:+CMSIncrementalMode
- // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it.
- // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines
- // The options are based on
- // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline
- JAVA_OPTS += " -XX:+UseConcMarkSweepGC "
- JAVA_OPTS += " -XX:+CMSIncrementalMode "
- JAVA_OPTS += " -XX:+CMSIncrementalPacing "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
- JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
- }
-*/
-
- ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())
-
- val credentials = UserGroupInformation.getCurrentUser().getCredentials()
- val dob = new DataOutputBuffer()
- credentials.writeTokenStorageToStream(dob)
- ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
-
- var javaCommand = "java";
- val javaHome = System.getenv("JAVA_HOME")
- if (javaHome != null && !javaHome.isEmpty()) {
- javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
- }
-
- val commands = List[String](javaCommand +
- " -server " +
- // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
- // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state.
- // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
- " -XX:OnOutOfMemoryError='kill %p' " +
- JAVA_OPTS +
- " spark.executor.StandaloneExecutorBackend " +
- masterAddress + " " +
- slaveId + " " +
- hostname + " " +
- workerCores +
- " 1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
- " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
- logInfo("Setting up worker with commands: " + commands)
- ctx.setCommands(commands)
-
- // Send the start request to the ContainerManager
- val startReq = Records.newRecord(classOf[StartContainerRequest])
- .asInstanceOf[StartContainerRequest]
- startReq.setContainerLaunchContext(ctx)
- cm.startContainer(startReq)
- }
-
-
- def prepareLocalResources: HashMap[String, LocalResource] = {
- logInfo("Preparing Local resources")
- val locaResources = HashMap[String, LocalResource]()
-
- // Spark JAR
- val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- sparkJarResource.setType(LocalResourceType.FILE)
- sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
- sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_JAR_PATH"))))
- sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong)
- sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong)
- locaResources("spark.jar") = sparkJarResource
- // User JAR
- val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- userJarResource.setType(LocalResourceType.FILE)
- userJarResource.setVisibility(LocalResourceVisibility.APPLICATION)
- userJarResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_USERJAR_PATH"))))
- userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong)
- userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong)
- locaResources("app.jar") = userJarResource
-
- // Log4j conf - if available
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource]
- log4jConfResource.setType(LocalResourceType.FILE)
- log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION)
- log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI(
- new URI(System.getenv("SPARK_YARN_LOG4J_PATH"))))
- log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong)
- log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong)
- locaResources("log4j.properties") = log4jConfResource
- }
-
-
- logInfo("Prepared Local resources " + locaResources)
- return locaResources
- }
-
- def prepareEnvironment: HashMap[String, String] = {
- val env = new HashMap[String, String]()
-
- // If log4j present, ensure ours overrides all others
- if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) {
- // Which is correct ?
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./")
- }
-
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*")
- Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH")
- Client.populateHadoopClasspath(yarnConf, env)
-
- System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
- return env
- }
-
- def connectToCM: ContainerManager = {
- val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort()
- val cmAddress = NetUtils.createSocketAddr(cmHostPortStr)
- logInfo("Connecting to ContainerManager at " + cmHostPortStr)
-
- // use doAs and remoteUser here so we can add the container token and not
- // pollute the current users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
- if (containerToken != null) {
- user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
- }
-
- val proxy = user
- .doAs(new PrivilegedExceptionAction[ContainerManager] {
- def run: ContainerManager = {
- return rpc.getProxy(classOf[ContainerManager],
- cmAddress, conf).asInstanceOf[ContainerManager]
- }
- });
- return proxy;
- }
-
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index b0af8baf08..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,564 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.deploy.yarn
-
-import spark.{Logging, Utils}
-import spark.scheduler.SplitInfo
-import scala.collection
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container}
-import spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
-import org.apache.hadoop.yarn.util.{RackResolver, Records}
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-import org.apache.hadoop.yarn.api.AMRMProtocol
-import collection.JavaConversions._
-import collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import org.apache.hadoop.conf.Configuration
-import java.util.{Collections, Set => JSet}
-import java.lang.{Boolean => JBoolean}
-
-object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
-
-// too many params ? refactor it 'somehow' ?
-// needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive : should make it
-// more proactive and decoupled.
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
-// on how we are requesting for containers.
-private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceManager: AMRMProtocol,
- val appAttemptId: ApplicationAttemptId,
- val maxWorkers: Int, val workerMemory: Int, val workerCores: Int,
- val preferredHostToCount: Map[String, Int],
- val preferredRackToCount: Map[String, Int])
- extends Logging {
-
-
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping
- private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]()
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
-
- // containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- private val numWorkersRunning = new AtomicInteger()
- // Used to generate a unique id per worker
- private val workerIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
-
- def getNumWorkersRunning: Int = numWorkersRunning.intValue
-
-
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- }
-
- def allocateContainers(workersToRequest: Int) {
- // We need to send the request only once from what I understand ... but for now, not modifying this much.
-
- // Keep polling the Resource Manager for containers
- val amResp = allocateWorkerResources(workersToRequest).getAMResponse
-
- val _allocatedContainers = amResp.getAllocatedContainers()
- if (_allocatedContainers.size > 0) {
-
-
- logDebug("Allocated " + _allocatedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
- logDebug("Cluster Resources: " + amResp.getAvailableResources)
-
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- // ignore if not satisfying constraints {
- for (container <- _allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // allocatedContainers += container
-
- val host = container.getNodeId.getHost
- val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
-
- containers += container
- }
- // Add all ignored containers to released list
- else releasedContainerList.add(container.getId())
- }
-
- // Find the appropriate containers to use
- // Slightly non trivial groupBy I guess ...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet)
- {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
- assert(remainingContainers != null)
-
- if (requiredHostCount >= remainingContainers.size){
- // Since we got <= required containers, add all to dataLocalContainers
- dataLocalContainers.put(candidateHost, remainingContainers)
- // all consumed
- remainingContainers = null
- }
- else if (requiredHostCount > 0) {
- // container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredHostCount)
- // and rest as remainingContainer
- val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
- // remainingContainers = remaining
-
- // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation cycle
- // will reallocate (but wont treat it as data local)
- for (container <- remaining) releasedContainerList.add(container.getId())
- remainingContainers = null
- }
-
- // now rack local
- if (remainingContainers != null){
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
-
- if (rack != null){
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.get(rack).getOrElse(List()).size
-
-
- if (requiredRackCount >= remainingContainers.size){
- // Add all to dataLocalContainers
- dataLocalContainers.put(rack, remainingContainers)
- // all consumed
- remainingContainers = null
- }
- else if (requiredRackCount > 0) {
- // container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredRackCount)
- // and rest as remainingContainer
- val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
- remainingContainers = remaining
- }
- }
- }
-
- // If still not consumed, then it is off rack host - add to that list.
- if (remainingContainers != null){
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order :
- // first host local, then rack local and then off rack (everything else).
- // Note that the list we create below tries to ensure that not all containers end up within a host
- // if there are sufficiently large number of hosts/containers.
-
- val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
- allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
- allocatedContainers ++= ClusterScheduler.prioritizeContainers(rackLocalContainers)
- allocatedContainers ++= ClusterScheduler.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers
- for (container <- allocatedContainers) {
- val numWorkersRunningNow = numWorkersRunning.incrementAndGet()
- val workerHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- assert (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
-
- if (numWorkersRunningNow > maxWorkers) {
- logInfo("Ignoring container " + containerId + " at host " + workerHostname +
- " .. we already have required number of containers")
- releasedContainerList.add(containerId)
- // reset counter back to old value.
- numWorkersRunning.decrementAndGet()
- }
- else {
- // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter)
- val workerId = workerIdCounter.incrementAndGet().toString
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
-
- logInfo("launching container on " + containerId + " host " + workerHostname)
- // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but ..
- pendingReleaseContainers.remove(containerId)
-
- val rack = YarnAllocationHandler.lookupRack(conf, workerHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, workerHostname)
- if (rack != null) allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
-
- new Thread(
- new WorkerRunnable(container, conf, driverUrl, workerId,
- workerHostname, workerMemory, workerCores)
- ).start()
- }
- }
- logDebug("After allocated " + allocatedContainers.size + " containers (orig : " +
- _allocatedContainers.size + "), current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
- }
-
-
- val completedContainers = amResp.getCompletedContainersStatuses()
- if (completedContainers.size > 0){
- logDebug("Completed " + completedContainers.size + " containers, current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
-
- for (completedContainer <- completedContainers){
- val containerId = completedContainer.getContainerId
-
- // Was this released by us ? If yes, then simply remove from containerSet and move on.
- if (pendingReleaseContainers.containsKey(containerId)) {
- pendingReleaseContainers.remove(containerId)
- }
- else {
- // simply decrement count - next iteration of ReporterThread will take care of allocating !
- numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
- }
-
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
- assert (host != null)
-
- val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
- assert (containerSet != null)
-
- containerSet -= containerId
- if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
- else allocatedHostToContainersMap.update(host, containerSet)
-
- allocatedContainerToHostMap -= containerId
-
- // doing this within locked context, sigh ... move to outside ?
- val rack = YarnAllocationHandler.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
- else allocatedRackCount.remove(rack)
- }
- }
- }
- }
- logDebug("After completed " + completedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
- }
- }
-
- def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
- // First generate modified racks and new set of hosts under it : then issue requests
- val rackToCounts = new HashMap[String, Int]()
-
- // Within this lock - used to read/write to the rack related maps too.
- for (container <- hostContainers) {
- val candidateHost = container.getHostName
- val candidateNumContainers = container.getNumContainers
- assert(YarnAllocationHandler.ANY_HOST != candidateHost)
-
- val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += candidateNumContainers
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts){
- requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
- }
-
- requestedContainers.toList
- }
-
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
-
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
-
- private def allocateWorkerResources(numWorkers: Int): AllocateResponse = {
-
- var resourceRequests: List[ResourceRequest] = null
-
- // default.
- if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences ? " + preferredHostToCount.isEmpty)
- resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
- }
- else {
- // request for all hosts in preferred nodes and for numWorkers -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests +=
- createResourceRequest(AllocationType.HOST, candidateHost, requiredCount, YarnAllocationHandler.PRIORITY)
- }
- }
- val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(hostContainerRequests.toList)
-
- val anyContainerRequests: ResourceRequest =
- createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY)
-
- val containerRequests: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1)
-
- containerRequests ++= hostContainerRequests
- containerRequests ++= rackContainerRequests
- containerRequests += anyContainerRequests
-
- resourceRequests = containerRequests.toList
- }
-
- val req = Records.newRecord(classOf[AllocateRequest])
- req.setResponseId(lastResponseId.incrementAndGet)
- req.setApplicationAttemptId(appAttemptId)
-
- req.addAllAsks(resourceRequests)
-
- val releasedContainerList = createReleasedContainerList()
- req.addAllReleases(releasedContainerList)
-
-
-
- if (numWorkers > 0) {
- logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
- }
- else {
- logDebug("Empty allocation req .. release : " + releasedContainerList)
- }
-
- for (req <- resourceRequests) {
- logInfo("rsrcRequest ... host : " + req.getHostName + ", numContainers : " + req.getNumContainers +
- ", p = " + req.getPriority().getPriority + ", capability: " + req.getCapability)
- }
- resourceManager.allocate(req)
- }
-
-
- private def createResourceRequest(requestType: AllocationType.AllocationType,
- resource:String, numWorkers: Int, priority: Int): ResourceRequest = {
-
- // If hostname specified, we need atleast two requests - node local and rack local.
- // There must be a third request - which is ANY : that will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert (YarnAllocationHandler.ANY_HOST != resource)
-
- val hostname = resource
- val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority)
-
- // add to host->rack mapping
- YarnAllocationHandler.populateRackInfo(conf, hostname)
-
- nodeLocal
- }
-
- case AllocationType.RACK => {
- val rack = resource
- createResourceRequestImpl(rack, numWorkers, priority)
- }
-
- case AllocationType.ANY => {
- createResourceRequestImpl(YarnAllocationHandler.ANY_HOST, numWorkers, priority)
- }
-
- case _ => throw new IllegalArgumentException("Unexpected/unsupported request type .. " + requestType)
- }
- }
-
- private def createResourceRequestImpl(hostname:String, numWorkers: Int, priority: Int): ResourceRequest = {
-
- val rsrcRequest = Records.newRecord(classOf[ResourceRequest])
- val memCapability = Records.newRecord(classOf[Resource])
- // There probably is some overhead here, let's reserve a bit more memory.
- memCapability.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
- rsrcRequest.setCapability(memCapability)
-
- val pri = Records.newRecord(classOf[Priority])
- pri.setPriority(priority)
- rsrcRequest.setPriority(pri)
-
- rsrcRequest.setHostName(hostname)
-
- rsrcRequest.setNumContainers(java.lang.Math.max(numWorkers, 0))
- rsrcRequest
- }
-
- def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-
- val retval = new ArrayBuffer[ContainerId](1)
- // iterator on COW list ...
- for (container <- releasedContainerList.iterator()){
- retval += container
- }
- // remove from the original list.
- if (! retval.isEmpty) {
- releasedContainerList.removeAll(retval)
- for (v <- retval) pendingReleaseContainers.put(v, true)
- logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
- pendingReleaseContainers)
- }
-
- retval
- }
-}
-
-object YarnAllocationHandler {
-
- val ANY_HOST = "*"
- // all requests are issued with same priority : we do not (yet) have any distinction between
- // request types (like map/reduce in hadoop for example)
- val PRIORITY = 1
-
- // Additional memory overhead - in mb
- val MEMORY_OVERHEAD = 384
-
- // host to rack map - saved from allocation requests
- // We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
- // response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
-
- def newAllocator(conf: Configuration,
- resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
-
- new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers,
- args.workerMemory, args.workerCores, hostToCount, rackToCount)
- }
-
- def newAllocator(conf: Configuration,
- resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId,
- maxWorkers: Int, workerMemory: Int, workerCores: Int,
- map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = {
-
- val (hostToCount, rackToCount) = generateNodeToWeight(conf, map)
-
- new YarnAllocationHandler(conf, resourceManager, appAttemptId, maxWorkers,
- workerMemory, workerCores, hostToCount, rackToCount)
- }
-
- // A simple method to copy the split info map.
- private def generateNodeToWeight(conf: Configuration, input: collection.Map[String, collection.Set[SplitInfo]]) :
- // host to count, rack to count
- (Map[String, Int], Map[String, Int]) = {
-
- if (input == null) return (Map[String, Int](), Map[String, Int]())
-
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
-
- val rack = lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
-
- (hostToCount.toMap, rackToCount.toMap)
- }
-
- def lookupRack(conf: Configuration, host: String): String = {
- if (! hostToRack.contains(host)) populateRackInfo(conf, host)
- hostToRack.get(host)
- }
-
- def fetchCachedHostsForRack(rack: String): Option[Set[String]] = {
- val set = rackToHostSet.get(rack)
- if (set == null) return None
-
- // No better way to get a Set[String] from JSet ?
- val convertedSet: collection.mutable.Set[String] = set
- Some(convertedSet.toSet)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list ?
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack, Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
-}
diff --git a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index bb58353e0c..0000000000
--- a/core/src/hadoop2-yarn/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import spark._
-import spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
-import org.apache.hadoop.conf.Configuration
-
-/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
-
- def this(sc: SparkContext) = this(sc, new Configuration())
-
- // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
- // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
- // Subsequent creations are ignored - since nodes are already allocated by then.
-
-
- // By default, rack is unknown
- override def getRackForHost(hostPort: String): Option[String] = {
- val host = Utils.parseHostPort(hostPort)._1
- val retval = YarnAllocationHandler.lookupRack(conf, host)
- if (retval != null) Some(retval) else None
- }
-
- override def postStartHook() {
- val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
- if (sparkContextInitialized){
- // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
- Thread.sleep(3000L)
- }
- logInfo("YarnClusterScheduler.postStartHook done")
- }
-}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
deleted file mode 100644
index 4b3d84670c..0000000000
--- a/core/src/hadoop2/scala/org/apache/hadoop/mapred/HadoopMapRedUtil.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-trait HadoopMapRedUtil {
- def newJobContext(conf: JobConf, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala b/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
deleted file mode 100644
index aa3b1ed3a5..0000000000
--- a/core/src/hadoop2/scala/org/apache/hadoop/mapreduce/HadoopMapReduceUtil.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapreduce
-
-import org.apache.hadoop.conf.Configuration
-import task.{TaskAttemptContextImpl, JobContextImpl}
-
-trait HadoopMapReduceUtil {
- def newJobContext(conf: Configuration, jobId: JobID): JobContext = new JobContextImpl(conf, jobId)
-
- def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
-
- def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = new TaskAttemptID(jtIdentifier,
- jobId, isMap, taskId, attemptId)
-}
diff --git a/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
new file mode 100644
index 0000000000..f87460039b
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred
+
+trait SparkHadoopMapRedUtil {
+ def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
+ val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
new file mode 100644
index 0000000000..93180307fa
--- /dev/null
+++ b/core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce
+
+import org.apache.hadoop.conf.Configuration
+import java.lang.{Integer => JInteger, Boolean => JBoolean}
+
+trait SparkHadoopMapReduceUtil {
+ def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.JobContextImpl", // hadoop2, hadoop2-yarn
+ "org.apache.hadoop.mapreduce.JobContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[JobID])
+ ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
+ }
+
+ def newTaskAttemptContext(conf: Configuration, attemptId: TaskAttemptID): TaskAttemptContext = {
+ val klass = firstAvailableClass(
+ "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl", // hadoop2, hadoop2-yarn
+ "org.apache.hadoop.mapreduce.TaskAttemptContext") // hadoop1
+ val ctor = klass.getDeclaredConstructor(classOf[Configuration], classOf[TaskAttemptID])
+ ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
+ }
+
+ def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
+ val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
+ try {
+ // first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ } catch {
+ case exc: NoSuchMethodException => {
+ // failed, look for the new ctor that takes a TaskType (not available in 1.x)
+ val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
+ val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
+ val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
+ classOf[Int], classOf[Int])
+ ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
+ JInteger(attemptId)).asInstanceOf[TaskAttemptID]
+ }
+ }
+ }
+
+ private def firstAvailableClass(first: String, second: String): Class[_] = {
+ try {
+ Class.forName(first)
+ } catch {
+ case e: ClassNotFoundException =>
+ Class.forName(second)
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index e7d4a7f562..cc1285dd95 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -31,12 +31,14 @@ import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.mapred.FileOutputCommitter
import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.HadoopWriter
+import org.apache.hadoop.mapred.SparkHadoopWriter
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
+ RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
+import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
@@ -50,7 +52,7 @@ import spark.Partitioner._
*/
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
extends Logging
- with HadoopMapReduceUtil
+ with SparkHadoopMapReduceUtil
with Serializable {
/**
@@ -627,7 +629,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
+ FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
@@ -653,7 +655,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
- val writer = new HadoopWriter(conf)
+ val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 185c76366f..fdd2dfa810 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -52,7 +52,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
-import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
+import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
@@ -235,7 +235,8 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
- val conf = SparkHadoopUtil.newConfiguration()
+ val env = SparkEnv.get
+ val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
@@ -623,10 +624,11 @@ class SparkContext(
logWarning("null specified as parameter to addJar",
new SparkException("null specified as parameter to addJar"))
} else {
+ val env = SparkEnv.get
val uri = new URI(path)
val key = uri.getScheme match {
case null | "file" =>
- if (SparkHadoopUtil.isYarnMode()) {
+ if (env.hadoop.isYarnMode()) {
logWarning("local jar specified as parameter to addJar under Yarn mode")
return
}
@@ -809,8 +811,9 @@ class SparkContext(
* prevent accidental overriding of checkpoint files in the existing directory.
*/
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
+ val env = SparkEnv.get
val path = new Path(dir)
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
if (!useExisting) {
if (fs.exists(path)) {
throw new Exception("Checkpoint directory '" + path + "' already exists.")
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index bca90886a3..1f66e9cc7f 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -25,6 +25,7 @@ import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager
import spark.metrics.MetricsSystem
+import spark.deploy.SparkHadoopUtil
import spark.storage.BlockManager
import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager
@@ -58,6 +59,19 @@ class SparkEnv (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+ val hadoop = {
+ val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if(yarnMode) {
+ try {
+ Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ } catch {
+ case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+ }
+ } else {
+ new SparkHadoopUtil
+ }
+ }
+
def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala
index b1fe0075a3..6b330ef572 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/SparkHadoopWriter.scala
@@ -36,7 +36,7 @@ import spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
-class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRedUtil with Serializable {
+class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)
@@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
splitID = splitid
attemptID = attemptid
- jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid))
+ jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
taID = new SerializableWritable[TaskAttemptID](
new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
}
@@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with HadoopMapRe
}
}
-object HadoopWriter {
+object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index a05dcdcd97..bb8aad3f4c 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -266,8 +266,9 @@ private object Utils extends Logging {
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
+ val env = SparkEnv.get
val uri = new URI(url)
- val conf = SparkHadoopUtil.newConfiguration()
+ val conf = env.hadoop.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
@@ -406,10 +407,6 @@ private object Utils extends Logging {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
- def getUserNameFromEnvironment(): String = {
- SparkHadoopUtil.getUserNameFromEnvironment
- }
-
// Typically, this will be of order of number of nodes in cluster
// If not, we should change it to LRUCache or something.
private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
index 617954cb98..882161e669 100644
--- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/spark/deploy/SparkHadoopUtil.scala
@@ -23,18 +23,7 @@ import org.apache.hadoop.mapred.JobConf
/**
* Contains util methods to interact with Hadoop from spark.
*/
-object SparkHadoopUtil {
-
- def getUserNameFromEnvironment(): String = {
- // defaulting to -D ...
- System.getProperty("user.name")
- }
-
- def runAsUser(func: (Product) => Unit, args: Product) {
-
- // Add support, if exists - for now, simply run func !
- func(args)
- }
+class SparkHadoopUtil {
// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
def newConfiguration(): Configuration = new Configuration()
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index e47fe50021..b5fb6dbe29 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -22,9 +22,8 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkEnv}
import spark.TaskState.TaskState
-import spark.deploy.SparkHadoopUtil
import spark.scheduler.cluster.StandaloneClusterMessages._
import spark.util.AkkaUtils
@@ -82,19 +81,6 @@ private[spark] class StandaloneExecutorBackend(
private[spark] object StandaloneExecutorBackend {
def run(driverUrl: String, executorId: String, hostname: String, cores: Int) {
- SparkHadoopUtil.runAsUser(run0, Tuple4[Any, Any, Any, Any] (driverUrl, executorId, hostname, cores))
- }
-
- // This will be run 'as' the user
- def run0(args: Product) {
- assert(4 == args.productArity)
- runImpl(args.productElement(0).asInstanceOf[String],
- args.productElement(1).asInstanceOf[String],
- args.productElement(2).asInstanceOf[String],
- args.productElement(3).asInstanceOf[Int])
- }
-
- private def runImpl(driverUrl: String, executorId: String, hostname: String, cores: Int) {
// Debug code
Utils.checkHost(hostname)
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 6794e0e201..1ad5fe6539 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.util.ReflectionUtils
import org.apache.hadoop.fs.Path
import java.io.{File, IOException, EOFException}
import java.text.NumberFormat
-import spark.deploy.SparkHadoopUtil
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
@@ -82,8 +81,9 @@ private[spark] object CheckpointRDD extends Logging {
}
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ val env = SparkEnv.get
val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
@@ -101,7 +101,7 @@ private[spark] object CheckpointRDD extends Logging {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
serializeStream.writeAll(iterator)
serializeStream.close()
@@ -121,10 +121,11 @@ private[spark] object CheckpointRDD extends Logging {
}
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val env = SparkEnv.get
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
- val serializer = SparkEnv.get.serializer.newInstance()
+ val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
@@ -140,10 +141,11 @@ private[spark] object CheckpointRDD extends Logging {
import spark._
val Array(cluster, hdfsPath) = args
+ val env = SparkEnv.get
val sc = new SparkContext(cluster, "CheckpointRDD Test")
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(SparkHadoopUtil.newConfiguration())
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index fd00d59c77..6c41b97780 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -32,8 +32,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import spark.deploy.SparkHadoopUtil
-import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
@@ -68,7 +67,8 @@ class HadoopRDD[K, V](
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
override def getPartitions: Array[Partition] = {
- SparkHadoopUtil.addCredentials(conf);
+ val env = SparkEnv.get
+ env.hadoop.addCredentials(conf)
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 0b71608169..184685528e 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -43,7 +43,7 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
- with HadoopMapReduceUtil
+ with SparkHadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
index 65f8c3200e..8f1b9b29b5 100644
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
@@ -17,7 +17,7 @@
package spark.scheduler
-import spark.Logging
+import spark.{Logging, SparkEnv}
import scala.collection.immutable.Set
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.hadoop.security.UserGroupInformation
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.conf.Configuration
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.collection.JavaConversions._
-import spark.deploy.SparkHadoopUtil
/**
@@ -88,8 +87,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val conf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(conf);
+ env.hadoop.addCredentials(conf)
FileInputFormat.setInputPaths(conf, path)
val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
@@ -108,8 +108,9 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
// This method does not expect failures, since validate has already passed ...
private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
val jobConf = new JobConf(configuration)
- SparkHadoopUtil.addCredentials(jobConf);
+ env.hadoop.addCredentials(jobConf)
FileInputFormat.setInputPaths(jobConf, path)
val instance: org.apache.hadoop.mapred.InputFormat[_, _] =