aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala30
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala12
4 files changed, 34 insertions, 16 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bf3aeb488d..0c72adfb95 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1756,6 +1756,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
SparkEnv.set(null)
}
+ // Unset YARN mode system env variable, to allow switching between cluster types.
+ System.clearProperty("SPARK_YARN_MODE")
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index a0b7365df9..d606b80c03 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -385,20 +385,13 @@ class SparkHadoopUtil extends Logging {
object SparkHadoopUtil {
- private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(
- System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
- if (yarnMode) {
- try {
- Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
- .newInstance()
- .asInstanceOf[SparkHadoopUtil]
- } catch {
- case e: Exception => throw new SparkException("Unable to load YARN support", e)
- }
- } else {
- new SparkHadoopUtil
- }
+ private lazy val hadoop = new SparkHadoopUtil
+ private lazy val yarn = try {
+ Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
+ .newInstance()
+ .asInstanceOf[SparkHadoopUtil]
+ } catch {
+ case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
@@ -406,6 +399,13 @@ object SparkHadoopUtil {
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
def get: SparkHadoopUtil = {
- hadoop
+ // Check each time to support changing to/from YARN
+ val yarnMode = java.lang.Boolean.valueOf(
+ System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if (yarnMode) {
+ yarn
+ } else {
+ hadoop
+ }
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a2c4bc2f54..8c53c24a79 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -86,7 +86,11 @@ private[spark] class Client(
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
- def stop(): Unit = yarnClient.stop()
+ def stop(): Unit = {
+ yarnClient.stop()
+ // Unset YARN mode system env variable, to allow switching between cluster types.
+ System.clearProperty("SPARK_YARN_MODE")
+ }
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 49bee0866d..e1c67db765 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.Matchers
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -233,4 +234,15 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}
+
+ test("check different hadoop utils based on env variable") {
+ try {
+ System.setProperty("SPARK_YARN_MODE", "true")
+ assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil])
+ System.setProperty("SPARK_YARN_MODE", "false")
+ assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil])
+ } finally {
+ System.clearProperty("SPARK_YARN_MODE")
+ }
+ }
}