aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/Logging.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala9
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala7
-rw-r--r--repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala32
-rw-r--r--repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala7
12 files changed, 62 insertions, 56 deletions
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 9e0a840b72..efab61e132 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -43,7 +43,7 @@ private[spark] trait Logging {
// Method to get or create the logger for this object
protected def log: Logger = {
if (log_ == null) {
- initializeIfNecessary()
+ initializeLogIfNecessary(false)
log_ = LoggerFactory.getLogger(logName)
}
log_
@@ -95,17 +95,17 @@ private[spark] trait Logging {
log.isTraceEnabled
}
- private def initializeIfNecessary() {
+ protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = {
if (!Logging.initialized) {
Logging.initLock.synchronized {
if (!Logging.initialized) {
- initializeLogging()
+ initializeLogging(isInterpreter)
}
}
}
}
- private def initializeLogging() {
+ private def initializeLogging(isInterpreter: Boolean): Unit = {
// Don't use a logger in here, as this is itself occurring during initialization of a logger
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
@@ -127,11 +127,11 @@ private[spark] trait Logging {
}
}
- if (Utils.isInInterpreter) {
+ if (isInterpreter) {
// Use the repl's main class to define the default log level when running the shell,
// overriding the root logger's config if they're different.
val rootLogger = LogManager.getRootLogger()
- val replLogger = LogManager.getLogger("org.apache.spark.repl.Main")
+ val replLogger = LogManager.getLogger(logName)
val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN)
if (replLevel != rootLogger.getEffectiveLevel()) {
System.err.printf("Setting default log level to \"%s\".\n", replLevel)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ff8c631585..0e2d51f9e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
*
* @param loadDefaults whether to also load values from Java system properties
*/
-class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
+class SparkConf private[spark] (loadDefaults: Boolean) extends Cloneable with Logging {
import SparkConf._
@@ -57,21 +57,32 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new ConcurrentHashMap[String, String]()
if (loadDefaults) {
+ loadFromSystemProperties(false)
+ }
+
+ private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
- set(key, value)
+ set(key, value, silent)
}
+ this
}
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
+ set(key, value, false)
+ }
+
+ private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
- logDeprecationWarning(key)
+ if (!silent) {
+ logDeprecationWarning(key)
+ }
settings.put(key, value)
this
}
@@ -395,7 +406,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Copy this object */
override def clone: SparkConf = {
- new SparkConf(false).setAll(getAll)
+ val cloned = new SparkConf(false)
+ settings.entrySet().asScala.foreach { e =>
+ cloned.set(e.getKey(), e.getValue(), true)
+ }
+ cloned
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 06b5101b1f..270ca84e24 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -45,7 +45,7 @@ import org.apache.spark.util.Utils
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
- private val sparkConf = new SparkConf()
+ private val sparkConf = new SparkConf(false).loadFromSystemProperties(true)
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 37c6c9bf90..63b9d34b79 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1820,15 +1820,6 @@ private[spark] object Utils extends Logging {
}
}
- lazy val isInInterpreter: Boolean = {
- try {
- val interpClass = classForName("org.apache.spark.repl.Main")
- interpClass.getMethod("interp").invoke(null) != null
- } catch {
- case _: ClassNotFoundException => false
- }
- }
-
/**
* Return a well-formed URI for the file described by a user input string.
*
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
index 14b448d076..5fe5c86289 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -19,7 +19,12 @@ package org.apache.spark.repl
import scala.collection.mutable.Set
-object Main {
+import org.apache.spark.Logging
+
+object Main extends Logging {
+
+ initializeLogIfNecessary(true)
+
private var _interp: SparkILoop = _
def interp = _interp
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 999e7ad3cc..a58f4234da 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLContext
object Main extends Logging {
+ initializeLogIfNecessary(true)
+
val conf = new SparkConf()
val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
@@ -50,39 +52,27 @@ object Main extends Logging {
// Visible for testing
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {
interp = _interp
+ val jars = conf.getOption("spark.jars")
+ .map(_.replace(",", File.pathSeparator))
+ .getOrElse("")
val interpArguments = List(
"-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}",
- "-classpath", getAddedJars.mkString(File.pathSeparator)
+ "-classpath", jars
) ++ args.toList
val settings = new GenericRunnerSettings(scalaOptionError)
settings.processArguments(interpArguments, true)
if (!hasErrors) {
- if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
interp.process(settings) // Repl starts and goes in loop of R.E.P.L
Option(sparkContext).map(_.stop)
}
}
- def getAddedJars: Array[String] = {
- val envJars = sys.env.get("ADD_JARS")
- if (envJars.isDefined) {
- logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead")
- }
- val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) }
- val jars = propJars.orElse(envJars).getOrElse("")
- Utils.resolveURIs(jars).split(",").filter(_.nonEmpty)
- }
-
def createSparkContext(): SparkContext = {
val execUri = System.getenv("SPARK_EXECUTOR_URI")
- val jars = getAddedJars
- val conf = new SparkConf()
- .setMaster(getMaster)
- .setJars(jars)
- .setIfMissing("spark.app.name", "Spark shell")
+ conf.setIfMissing("spark.app.name", "Spark shell")
// SparkContext will detect this configuration and register it with the RpcEnv's
// file server, setting spark.repl.class.uri to the actual URI for executors to
// use. This is sort of ugly but since executors are started as part of SparkContext
@@ -115,12 +105,4 @@ object Main extends Logging {
sqlContext
}
- private def getMaster: String = {
- val master = {
- val envMaster = sys.env.get("MASTER")
- val propMaster = sys.props.get("spark.master")
- propMaster.orElse(envMaster).getOrElse("local[*]")
- }
- master
- }
}
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 239096be79..6bee880640 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -48,8 +48,8 @@ class ReplSuite extends SparkFunSuite {
val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
- System.setProperty("spark.master", master)
- Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
+ Main.conf.set("spark.master", master)
+ Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))
if (oldExecutorClasspath != null) {
System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9725dcfde1..8244dd4230 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -210,6 +210,7 @@ class HiveContext private[hive](
logInfo(s"Initializing execution hive, version $hiveExecutionVersion")
val loader = new IsolatedClientLoader(
version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion),
+ sparkConf = sc.conf,
execJars = Seq(),
config = newTemporaryConfiguration(useInMemoryDerby = true),
isolationOn = false,
@@ -278,6 +279,7 @@ class HiveContext private[hive](
s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
new IsolatedClientLoader(
version = metaVersion,
+ sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
@@ -290,6 +292,7 @@ class HiveContext private[hive](
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = hiveMetastoreVersion,
hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sc.conf,
config = allConfig,
barrierPrefixes = hiveMetastoreBarrierPrefixes,
sharedPrefixes = hiveMetastoreSharedPrefixes)
@@ -317,6 +320,7 @@ class HiveContext private[hive](
s"using ${jars.mkString(":")}")
new IsolatedClientLoader(
version = metaVersion,
+ sparkConf = sc.conf,
execJars = jars.toSeq,
config = allConfig,
isolationOn = true,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c1c8e631ee..c108750c38 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -60,6 +60,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
*/
private[hive] class HiveClientImpl(
override val version: HiveVersion,
+ sparkConf: SparkConf,
config: Map[String, String],
initClassLoader: ClassLoader,
val clientLoader: IsolatedClientLoader)
@@ -90,7 +91,6 @@ private[hive] class HiveClientImpl(
// instance of SparkConf is needed for the original value of spark.yarn.keytab
// and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the
// keytab configuration for the link name in distributed cache
- val sparkConf = new SparkConf
if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) {
val principalName = sparkConf.get("spark.yarn.principal")
val keytabFileName = sparkConf.get("spark.yarn.keytab")
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 1653371d89..024f4dfeba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -27,7 +27,7 @@ import scala.util.Try
import org.apache.commons.io.{FileUtils, IOUtils}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveContext
@@ -41,6 +41,7 @@ private[hive] object IsolatedClientLoader extends Logging {
def forVersion(
hiveMetastoreVersion: String,
hadoopVersion: String,
+ sparkConf: SparkConf,
config: Map[String, String] = Map.empty,
ivyPath: Option[String] = None,
sharedPrefixes: Seq[String] = Seq.empty,
@@ -75,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging {
}
new IsolatedClientLoader(
- version = hiveVersion(hiveMetastoreVersion),
+ hiveVersion(hiveMetastoreVersion),
+ sparkConf,
execJars = files,
config = config,
sharesHadoopClasses = sharesHadoopClasses,
@@ -146,6 +148,7 @@ private[hive] object IsolatedClientLoader extends Logging {
*/
private[hive] class IsolatedClientLoader(
val version: HiveVersion,
+ val sparkConf: SparkConf,
val execJars: Seq[URL] = Seq.empty,
val config: Map[String, String] = Map.empty,
val isolationOn: Boolean = true,
@@ -235,7 +238,7 @@ private[hive] class IsolatedClientLoader(
/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
if (!isolationOn) {
- return new HiveClientImpl(version, config, baseClassLoader, this)
+ return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
@@ -246,7 +249,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
- .newInstance(version, config, classLoader, this)
+ .newInstance(version, sparkConf, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
index f557abcd52..2809f9439b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.util.VersionInfo
+import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader}
import org.apache.spark.util.Utils
-
/**
* Test suite for the [[HiveCatalog]].
*/
@@ -32,7 +32,8 @@ class HiveCatalogSuite extends CatalogTestCases {
private val client: HiveClient = {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
- hadoopVersion = VersionInfo.getVersion).createClient()
+ hadoopVersion = VersionInfo.getVersion,
+ sparkConf = new SparkConf()).createClient()
}
protected override val tableInputFormat: String =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index d850d522be..6292f6c3af 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,7 +21,7 @@ import java.io.File
import org.apache.hadoop.util.VersionInfo
-import org.apache.spark.{Logging, SparkFunSuite}
+import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
@@ -39,6 +39,8 @@ import org.apache.spark.util.Utils
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
+ private val sparkConf = new SparkConf()
+
// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
private val ivyPath: Option[String] = {
@@ -59,6 +61,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
val badClient = IsolatedClientLoader.forVersion(
hiveMetastoreVersion = HiveContext.hiveExecutionVersion,
hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
val db = new CatalogDatabase("default", "desc", "loc", Map())
@@ -93,6 +96,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = "13",
hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}
@@ -112,6 +116,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
IsolatedClientLoader.forVersion(
hiveMetastoreVersion = version,
hadoopVersion = VersionInfo.getVersion,
+ sparkConf = sparkConf,
config = buildConf(),
ivyPath = ivyPath).createClient()
}