diff options
12 files changed, 62 insertions, 56 deletions
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index 9e0a840b72..efab61e132 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -43,7 +43,7 @@ private[spark] trait Logging { // Method to get or create the logger for this object protected def log: Logger = { if (log_ == null) { - initializeIfNecessary() + initializeLogIfNecessary(false) log_ = LoggerFactory.getLogger(logName) } log_ @@ -95,17 +95,17 @@ private[spark] trait Logging { log.isTraceEnabled } - private def initializeIfNecessary() { + protected def initializeLogIfNecessary(isInterpreter: Boolean): Unit = { if (!Logging.initialized) { Logging.initLock.synchronized { if (!Logging.initialized) { - initializeLogging() + initializeLogging(isInterpreter) } } } } - private def initializeLogging() { + private def initializeLogging(isInterpreter: Boolean): Unit = { // Don't use a logger in here, as this is itself occurring during initialization of a logger // If Log4j 1.2 is being used, but is not initialized, load a default properties file val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr @@ -127,11 +127,11 @@ private[spark] trait Logging { } } - if (Utils.isInInterpreter) { + if (isInterpreter) { // Use the repl's main class to define the default log level when running the shell, // overriding the root logger's config if they're different. val rootLogger = LogManager.getRootLogger() - val replLogger = LogManager.getLogger("org.apache.spark.repl.Main") + val replLogger = LogManager.getLogger(logName) val replLevel = Option(replLogger.getLevel()).getOrElse(Level.WARN) if (replLevel != rootLogger.getEffectiveLevel()) { System.err.printf("Setting default log level to \"%s\".\n", replLevel) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index ff8c631585..0e2d51f9e7 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -47,7 +47,7 @@ import org.apache.spark.util.Utils * * @param loadDefaults whether to also load values from Java system properties */ -class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { +class SparkConf private[spark] (loadDefaults: Boolean) extends Cloneable with Logging { import SparkConf._ @@ -57,21 +57,32 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { private val settings = new ConcurrentHashMap[String, String]() if (loadDefaults) { + loadFromSystemProperties(false) + } + + private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { - set(key, value) + set(key, value, silent) } + this } /** Set a configuration variable. */ def set(key: String, value: String): SparkConf = { + set(key, value, false) + } + + private[spark] def set(key: String, value: String, silent: Boolean): SparkConf = { if (key == null) { throw new NullPointerException("null key") } if (value == null) { throw new NullPointerException("null value for " + key) } - logDeprecationWarning(key) + if (!silent) { + logDeprecationWarning(key) + } settings.put(key, value) this } @@ -395,7 +406,11 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { /** Copy this object */ override def clone: SparkConf = { - new SparkConf(false).setAll(getAll) + val cloned = new SparkConf(false) + settings.entrySet().asScala.foreach { e => + cloned.set(e.getKey(), e.getValue(), true) + } + cloned } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 06b5101b1f..270ca84e24 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -45,7 +45,7 @@ import org.apache.spark.util.Utils */ @DeveloperApi class SparkHadoopUtil extends Logging { - private val sparkConf = new SparkConf() + private val sparkConf = new SparkConf(false).loadFromSystemProperties(true) val conf: Configuration = newConfiguration(sparkConf) UserGroupInformation.setConfiguration(conf) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 37c6c9bf90..63b9d34b79 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1820,15 +1820,6 @@ private[spark] object Utils extends Logging { } } - lazy val isInInterpreter: Boolean = { - try { - val interpClass = classForName("org.apache.spark.repl.Main") - interpClass.getMethod("interp").invoke(null) != null - } catch { - case _: ClassNotFoundException => false - } - } - /** * Return a well-formed URI for the file described by a user input string. * diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala index 14b448d076..5fe5c86289 100644 --- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala @@ -19,7 +19,12 @@ package org.apache.spark.repl import scala.collection.mutable.Set -object Main { +import org.apache.spark.Logging + +object Main extends Logging { + + initializeLogIfNecessary(true) + private var _interp: SparkILoop = _ def interp = _interp diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 999e7ad3cc..a58f4234da 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.SQLContext object Main extends Logging { + initializeLogIfNecessary(true) + val conf = new SparkConf() val rootDir = conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf)) val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl") @@ -50,39 +52,27 @@ object Main extends Logging { // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp + val jars = conf.getOption("spark.jars") + .map(_.replace(",", File.pathSeparator)) + .getOrElse("") val interpArguments = List( "-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", - "-classpath", getAddedJars.mkString(File.pathSeparator) + "-classpath", jars ) ++ args.toList val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) if (!hasErrors) { - if (getMaster == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") interp.process(settings) // Repl starts and goes in loop of R.E.P.L Option(sparkContext).map(_.stop) } } - def getAddedJars: Array[String] = { - val envJars = sys.env.get("ADD_JARS") - if (envJars.isDefined) { - logWarning("ADD_JARS environment variable is deprecated, use --jar spark submit argument instead") - } - val propJars = sys.props.get("spark.jars").flatMap { p => if (p == "") None else Some(p) } - val jars = propJars.orElse(envJars).getOrElse("") - Utils.resolveURIs(jars).split(",").filter(_.nonEmpty) - } - def createSparkContext(): SparkContext = { val execUri = System.getenv("SPARK_EXECUTOR_URI") - val jars = getAddedJars - val conf = new SparkConf() - .setMaster(getMaster) - .setJars(jars) - .setIfMissing("spark.app.name", "Spark shell") + conf.setIfMissing("spark.app.name", "Spark shell") // SparkContext will detect this configuration and register it with the RpcEnv's // file server, setting spark.repl.class.uri to the actual URI for executors to // use. This is sort of ugly but since executors are started as part of SparkContext @@ -115,12 +105,4 @@ object Main extends Logging { sqlContext } - private def getMaster: String = { - val master = { - val envMaster = sys.env.get("MASTER") - val propMaster = sys.props.get("spark.master") - propMaster.orElse(envMaster).getOrElse("local[*]") - } - master - } } diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 239096be79..6bee880640 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -48,8 +48,8 @@ class ReplSuite extends SparkFunSuite { val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH) System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath) - System.setProperty("spark.master", master) - Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out))) + Main.conf.set("spark.master", master) + Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out))) if (oldExecutorClasspath != null) { System.setProperty(CONF_EXECUTOR_CLASSPATH, oldExecutorClasspath) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 9725dcfde1..8244dd4230 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -210,6 +210,7 @@ class HiveContext private[hive]( logInfo(s"Initializing execution hive, version $hiveExecutionVersion") val loader = new IsolatedClientLoader( version = IsolatedClientLoader.hiveVersion(hiveExecutionVersion), + sparkConf = sc.conf, execJars = Seq(), config = newTemporaryConfiguration(useInMemoryDerby = true), isolationOn = false, @@ -278,6 +279,7 @@ class HiveContext private[hive]( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.") new IsolatedClientLoader( version = metaVersion, + sparkConf = sc.conf, execJars = jars.toSeq, config = allConfig, isolationOn = true, @@ -290,6 +292,7 @@ class HiveContext private[hive]( IsolatedClientLoader.forVersion( hiveMetastoreVersion = hiveMetastoreVersion, hadoopVersion = VersionInfo.getVersion, + sparkConf = sc.conf, config = allConfig, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) @@ -317,6 +320,7 @@ class HiveContext private[hive]( s"using ${jars.mkString(":")}") new IsolatedClientLoader( version = metaVersion, + sparkConf = sc.conf, execJars = jars.toSeq, config = allConfig, isolationOn = true, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c1c8e631ee..c108750c38 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -60,6 +60,7 @@ import org.apache.spark.util.{CircularBuffer, Utils} */ private[hive] class HiveClientImpl( override val version: HiveVersion, + sparkConf: SparkConf, config: Map[String, String], initClassLoader: ClassLoader, val clientLoader: IsolatedClientLoader) @@ -90,7 +91,6 @@ private[hive] class HiveClientImpl( // instance of SparkConf is needed for the original value of spark.yarn.keytab // and spark.yarn.principal set in SparkSubmit, as yarn.Client resets the // keytab configuration for the link name in distributed cache - val sparkConf = new SparkConf if (sparkConf.contains("spark.yarn.principal") && sparkConf.contains("spark.yarn.keytab")) { val principalName = sparkConf.get("spark.yarn.principal") val keytabFileName = sparkConf.get("spark.yarn.keytab") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index 1653371d89..024f4dfeba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -27,7 +27,7 @@ import scala.util.Try import org.apache.commons.io.{FileUtils, IOUtils} -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.hive.HiveContext @@ -41,6 +41,7 @@ private[hive] object IsolatedClientLoader extends Logging { def forVersion( hiveMetastoreVersion: String, hadoopVersion: String, + sparkConf: SparkConf, config: Map[String, String] = Map.empty, ivyPath: Option[String] = None, sharedPrefixes: Seq[String] = Seq.empty, @@ -75,7 +76,8 @@ private[hive] object IsolatedClientLoader extends Logging { } new IsolatedClientLoader( - version = hiveVersion(hiveMetastoreVersion), + hiveVersion(hiveMetastoreVersion), + sparkConf, execJars = files, config = config, sharesHadoopClasses = sharesHadoopClasses, @@ -146,6 +148,7 @@ private[hive] object IsolatedClientLoader extends Logging { */ private[hive] class IsolatedClientLoader( val version: HiveVersion, + val sparkConf: SparkConf, val execJars: Seq[URL] = Seq.empty, val config: Map[String, String] = Map.empty, val isolationOn: Boolean = true, @@ -235,7 +238,7 @@ private[hive] class IsolatedClientLoader( /** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = { if (!isolationOn) { - return new HiveClientImpl(version, config, baseClassLoader, this) + return new HiveClientImpl(version, sparkConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -246,7 +249,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head - .newInstance(version, config, classLoader, this) + .newInstance(version, sparkConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala index f557abcd52..2809f9439b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql.hive import org.apache.hadoop.util.VersionInfo +import org.apache.spark.SparkConf import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.hive.client.{HiveClient, IsolatedClientLoader} import org.apache.spark.util.Utils - /** * Test suite for the [[HiveCatalog]]. */ @@ -32,7 +32,8 @@ class HiveCatalogSuite extends CatalogTestCases { private val client: HiveClient = { IsolatedClientLoader.forVersion( hiveMetastoreVersion = HiveContext.hiveExecutionVersion, - hadoopVersion = VersionInfo.getVersion).createClient() + hadoopVersion = VersionInfo.getVersion, + sparkConf = new SparkConf()).createClient() } protected override val tableInputFormat: String = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d850d522be..6292f6c3af 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.hadoop.util.VersionInfo -import org.apache.spark.{Logging, SparkFunSuite} +import org.apache.spark.{Logging, SparkConf, SparkFunSuite} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly @@ -39,6 +39,8 @@ import org.apache.spark.util.Utils @ExtendedHiveTest class VersionsSuite extends SparkFunSuite with Logging { + private val sparkConf = new SparkConf() + // In order to speed up test execution during development or in Jenkins, you can specify the path // of an existing Ivy cache: private val ivyPath: Option[String] = { @@ -59,6 +61,7 @@ class VersionsSuite extends SparkFunSuite with Logging { val badClient = IsolatedClientLoader.forVersion( hiveMetastoreVersion = HiveContext.hiveExecutionVersion, hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, config = buildConf(), ivyPath = ivyPath).createClient() val db = new CatalogDatabase("default", "desc", "loc", Map()) @@ -93,6 +96,7 @@ class VersionsSuite extends SparkFunSuite with Logging { IsolatedClientLoader.forVersion( hiveMetastoreVersion = "13", hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, config = buildConf(), ivyPath = ivyPath).createClient() } @@ -112,6 +116,7 @@ class VersionsSuite extends SparkFunSuite with Logging { IsolatedClientLoader.forVersion( hiveMetastoreVersion = version, hadoopVersion = VersionInfo.getVersion, + sparkConf = sparkConf, config = buildConf(), ivyPath = ivyPath).createClient() } |