From 5b77e66dd6a128c5992ab3bde418613f84be7009 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 11 Oct 2016 14:56:26 -0700 Subject: [SPARK-17387][PYSPARK] Creating SparkContext() from python without spark-submit ignores user conf ## What changes were proposed in this pull request? The root cause that we would ignore SparkConf when launching JVM is that SparkConf require JVM to be created first. https://github.com/apache/spark/blob/master/python/pyspark/conf.py#L106 In this PR, I would defer the launching of JVM until SparkContext is created so that we can pass SparkConf to JVM correctly. ## How was this patch tested? Use the example code in the description of SPARK-17387, ``` $ SPARK_HOME=$PWD PYTHONPATH=python:python/lib/py4j-0.10.3-src.zip python Python 2.7.12 (default, Jul 1 2016, 15:12:24) [GCC 5.4.0 20160609] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> from pyspark import SparkContext >>> from pyspark import SparkConf >>> conf = SparkConf().set("spark.driver.memory", "4g") >>> sc = SparkContext(conf=conf) ``` And verify the spark.driver.memory is correctly picked up. ``` ...op/ -Xmx4g org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=4g pyspark-shell ``` Author: Jeff Zhang Closes #14959 from zjffdu/SPARK-17387. --- python/pyspark/conf.py | 71 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 19 deletions(-) (limited to 'python/pyspark/conf.py') diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index 924da3eecf..64b6f238e9 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -52,6 +52,14 @@ spark.home=/path >>> sorted(conf.getAll(), key=lambda p: p[0]) [(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), \ (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')] +>>> conf._jconf.setExecutorEnv("VAR5", "value5") +JavaObject id... +>>> print(conf.toDebugString()) +spark.executorEnv.VAR1=value1 +spark.executorEnv.VAR3=value3 +spark.executorEnv.VAR4=value4 +spark.executorEnv.VAR5=value5 +spark.home=/path """ __all__ = ['SparkConf'] @@ -101,13 +109,24 @@ class SparkConf(object): self._jconf = _jconf else: from pyspark.context import SparkContext - SparkContext._ensure_initialized() _jvm = _jvm or SparkContext._jvm - self._jconf = _jvm.SparkConf(loadDefaults) + + if _jvm is not None: + # JVM is created, so create self._jconf directly through JVM + self._jconf = _jvm.SparkConf(loadDefaults) + self._conf = None + else: + # JVM is not created, so store data in self._conf first + self._jconf = None + self._conf = {} def set(self, key, value): """Set a configuration property.""" - self._jconf.set(key, unicode(value)) + # Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet. + if self._jconf is not None: + self._jconf.set(key, unicode(value)) + else: + self._conf[key] = unicode(value) return self def setIfMissing(self, key, value): @@ -118,17 +137,17 @@ class SparkConf(object): def setMaster(self, value): """Set master URL to connect to.""" - self._jconf.setMaster(value) + self.set("spark.master", value) return self def setAppName(self, value): """Set application name.""" - self._jconf.setAppName(value) + self.set("spark.app.name", value) return self def setSparkHome(self, value): """Set path where Spark is installed on worker nodes.""" - self._jconf.setSparkHome(value) + self.set("spark.home", value) return self def setExecutorEnv(self, key=None, value=None, pairs=None): @@ -136,10 +155,10 @@ class SparkConf(object): if (key is not None and pairs is not None) or (key is None and pairs is None): raise Exception("Either pass one key-value pair or a list of pairs") elif key is not None: - self._jconf.setExecutorEnv(key, value) + self.set("spark.executorEnv." + key, value) elif pairs is not None: for (k, v) in pairs: - self._jconf.setExecutorEnv(k, v) + self.set("spark.executorEnv." + k, v) return self def setAll(self, pairs): @@ -149,35 +168,49 @@ class SparkConf(object): :param pairs: list of key-value pairs to set """ for (k, v) in pairs: - self._jconf.set(k, v) + self.set(k, v) return self def get(self, key, defaultValue=None): """Get the configured value for some key, or return a default otherwise.""" if defaultValue is None: # Py4J doesn't call the right get() if we pass None - if not self._jconf.contains(key): - return None - return self._jconf.get(key) + if self._jconf is not None: + if not self._jconf.contains(key): + return None + return self._jconf.get(key) + else: + if key not in self._conf: + return None + return self._conf[key] else: - return self._jconf.get(key, defaultValue) + if self._jconf is not None: + return self._jconf.get(key, defaultValue) + else: + return self._conf.get(key, defaultValue) def getAll(self): """Get all values as a list of key-value pairs.""" - pairs = [] - for elem in self._jconf.getAll(): - pairs.append((elem._1(), elem._2())) - return pairs + if self._jconf is not None: + return [(elem._1(), elem._2()) for elem in self._jconf.getAll()] + else: + return self._conf.items() def contains(self, key): """Does this configuration contain a given key?""" - return self._jconf.contains(key) + if self._jconf is not None: + return self._jconf.contains(key) + else: + return key in self._conf def toDebugString(self): """ Returns a printable version of the configuration, as a list of key=value pairs, one per line. """ - return self._jconf.toDebugString() + if self._jconf is not None: + return self._jconf.toDebugString() + else: + return '\n'.join('%s=%s' % (k, v) for k, v in self._conf.items()) def _test(): -- cgit v1.2.3