aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2017-01-25 12:08:08 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-01-25 12:08:08 -0800
commit92afaa93a0b67f561a790822ccdd2b814455edcc (patch)
tree16ce02d79b9c132cd7adc66e9b3f2e6d1181b070
parentf6480b1467d0432fb2aa48c7a3a8a6e6679fd481 (diff)
downloadspark-92afaa93a0b67f561a790822ccdd2b814455edcc.tar.gz
spark-92afaa93a0b67f561a790822ccdd2b814455edcc.tar.bz2
spark-92afaa93a0b67f561a790822ccdd2b814455edcc.zip
[SPARK-19307][PYSPARK] Make sure user conf is propagated to SparkContext.
The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16682 from vanzin/SPARK-19307.
-rw-r--r--python/pyspark/context.py3
-rw-r--r--python/pyspark/tests.py20
2 files changed, 23 insertions, 0 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5c4e79cb04..ac4b2b035f 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -132,6 +132,9 @@ class SparkContext(object):
self._conf = conf
else:
self._conf = SparkConf(_jvm=SparkContext._jvm)
+ if conf is not None:
+ for k, v in conf.getAll():
+ self._conf.set(k, v)
self._batchSize = batchSize # -1 represents an unlimited batch size
self._unbatched_serializer = serializer
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index c383d9ab67..e908b1e739 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -2035,6 +2035,26 @@ class SparkSubmitTests(unittest.TestCase):
self.assertEqual(0, proc.returncode)
self.assertIn("[2, 4, 6]", out.decode('utf-8'))
+ def test_user_configuration(self):
+ """Make sure user configuration is respected (SPARK-19307)"""
+ script = self.createTempFile("test.py", """
+ |from pyspark import SparkConf, SparkContext
+ |
+ |conf = SparkConf().set("spark.test_config", "1")
+ |sc = SparkContext(conf = conf)
+ |try:
+ | if sc._conf.get("spark.test_config") != "1":
+ | raise Exception("Cannot find spark.test_config in SparkContext's conf.")
+ |finally:
+ | sc.stop()
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--master", "local", script],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out))
+
class ContextTests(unittest.TestCase):