aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2016-06-28 07:54:44 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-28 07:54:44 -0700
commit0923c4f5676691e28e70ecb05890e123540b91f0 (patch)
treefc4b3fff3d5ab0f07080ec90cabeae9786bae147 /python
parente158478a9fff5e63ae0336a54b3f360d0cd38921 (diff)
downloadspark-0923c4f5676691e28e70ecb05890e123540b91f0.tar.gz
spark-0923c4f5676691e28e70ecb05890e123540b91f0.tar.bz2
spark-0923c4f5676691e28e70ecb05890e123540b91f0.zip
[SPARK-16224] [SQL] [PYSPARK] SparkSession builder's configs need to be set to the existing Scala SparkContext's SparkConf
## What changes were proposed in this pull request? When we create a SparkSession at the Python side, it is possible that a SparkContext has been created. For this case, we need to set configs of the SparkSession builder to the Scala SparkContext's SparkConf (we need to do so because conf changes on a active Python SparkContext will not be propagated to the JVM side). Otherwise, we may create a wrong SparkSession (e.g. Hive support is not enabled even if enableHiveSupport is called). ## How was this patch tested? New tests and manual tests. Author: Yin Huai <yhuai@databricks.com> Closes #13931 from yhuai/SPARK-16224.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py2
-rw-r--r--python/pyspark/sql/session.py7
-rw-r--r--python/pyspark/sql/tests.py43
-rw-r--r--python/pyspark/tests.py8
4 files changed, 59 insertions, 1 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 7217a9907a..6e9f24ef10 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -166,6 +166,8 @@ class SparkContext(object):
# Create the Java SparkContext through Py4J
self._jsc = jsc or self._initialize_context(self._conf._jconf)
+ # Reset the SparkConf to the one actually used by the SparkContext in JVM.
+ self._conf = SparkConf(_jconf=self._jsc.sc().conf())
# Create a single Accumulator in Java that we'll send all our updates through;
# they will be passed back to us through a TCP server
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 0c8024e4a8..b4152a34ad 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -165,6 +165,13 @@ class SparkSession(object):
for key, value in self._options.items():
sparkConf.set(key, value)
sc = SparkContext.getOrCreate(sparkConf)
+ # This SparkContext may be an existing one.
+ for key, value in self._options.items():
+ # we need to propagate the confs
+ # before we create the SparkSession. Otherwise, confs like
+ # warehouse path and metastore url will not be set correctly (
+ # these confs cannot be changed once the SparkSession is created).
+ sc._conf.set(key, value)
session = SparkSession(sc)
for key, value in self._options.items():
session.conf.set(key, value)
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3f564110ed..f863485e6c 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -22,6 +22,7 @@ individual modules.
"""
import os
import sys
+import subprocess
import pydoc
import shutil
import tempfile
@@ -48,7 +49,7 @@ else:
from pyspark.sql import SparkSession, HiveContext, Column, Row
from pyspark.sql.types import *
from pyspark.sql.types import UserDefinedType, _infer_type
-from pyspark.tests import ReusedPySparkTestCase
+from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
from pyspark.sql.functions import UserDefinedFunction, sha2
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
@@ -1619,6 +1620,46 @@ class SQLTests(ReusedPySparkTestCase):
lambda: spark.catalog.uncacheTable("does_not_exist"))
+class HiveSparkSubmitTests(SparkSubmitTests):
+
+ def test_hivecontext(self):
+ # This test checks that HiveContext is using Hive metastore (SPARK-16224).
+ # It sets a metastore url and checks if there is a derby dir created by
+ # Hive metastore. If this derby dir exists, HiveContext is using
+ # Hive metastore.
+ metastore_path = os.path.join(tempfile.mkdtemp(), "spark16224_metastore_db")
+ metastore_URL = "jdbc:derby:;databaseName=" + metastore_path + ";create=true"
+ hive_site_dir = os.path.join(self.programDir, "conf")
+ hive_site_file = self.createTempFile("hive-site.xml", ("""
+ |<configuration>
+ | <property>
+ | <name>javax.jdo.option.ConnectionURL</name>
+ | <value>%s</value>
+ | </property>
+ |</configuration>
+ """ % metastore_URL).lstrip(), "conf")
+ script = self.createTempFile("test.py", """
+ |import os
+ |
+ |from pyspark.conf import SparkConf
+ |from pyspark.context import SparkContext
+ |from pyspark.sql import HiveContext
+ |
+ |conf = SparkConf()
+ |sc = SparkContext(conf=conf)
+ |hive_context = HiveContext(sc)
+ |print(hive_context.sql("show databases").collect())
+ """)
+ proc = subprocess.Popen(
+ [self.sparkSubmit, "--master", "local-cluster[1,1,1024]",
+ "--driver-class-path", hive_site_dir, script],
+ stdout=subprocess.PIPE)
+ out, err = proc.communicate()
+ self.assertEqual(0, proc.returncode)
+ self.assertIn("default", out.decode('utf-8'))
+ self.assertTrue(os.path.exists(metastore_path))
+
+
class HiveContextSQLTests(ReusedPySparkTestCase):
@classmethod
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 222c5ca5f4..0a029b6e74 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -1921,6 +1921,14 @@ class ContextTests(unittest.TestCase):
post_parallalize_temp_files = os.listdir(sc._temp_dir)
self.assertEqual(temp_files, post_parallalize_temp_files)
+ def test_set_conf(self):
+ # This is for an internal use case. When there is an existing SparkContext,
+ # SparkSession's builder needs to set configs into SparkContext's conf.
+ sc = SparkContext()
+ sc._conf.set("spark.test.SPARK16224", "SPARK16224")
+ self.assertEqual(sc._jsc.sc().conf().get("spark.test.SPARK16224"), "SPARK16224")
+ sc.stop()
+
def test_stop(self):
sc = SparkContext()
self.assertNotEqual(SparkContext._active_spark_context, None)