aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/tests.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-14 12:22:02 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-14 12:22:02 -0700
commit07cb323e7a128b87ef265ddc66f033365d9de463 (patch)
tree41de1d7201b8592ffd8742aa09a98ee1beb207e6 /python/pyspark/streaming/tests.py
parent6a4bfcd62b7effcfbb865bdd301d41a0ba6e5c94 (diff)
downloadspark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.gz
spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.bz2
spark-07cb323e7a128b87ef265ddc66f033365d9de463.zip
[SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classloading issue
This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark. In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches. Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2 /cc zsxwing tdas davies brkyvz Author: Josh Rosen <joshrosen@databricks.com> Closes #11687 from JoshRosen/py4j-0.9.2.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r--python/pyspark/streaming/tests.py25
1 files changed, 5 insertions, 20 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 469c068134..f4bbb1b128 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(KafkaStreamTests, self).setUp()
-
- kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
- .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
- self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
+ self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils()
self._kafkaTestUtils.setup()
def tearDown(self):
@@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(FlumeStreamTests, self).setUp()
-
- utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
- self._utils = utilsClz.newInstance()
+ self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
def tearDown(self):
if self._utils is not None:
@@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
maxAttempts = 5
def setUp(self):
- utilsClz = \
- self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
- self._utils = utilsClz.newInstance()
+ self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
def tearDown(self):
if self._utils is not None:
@@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(MQTTStreamTests, self).setUp()
-
- MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
- self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+ self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
self._MQTTTestUtils.setup()
def tearDown(self):
@@ -1498,10 +1486,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
import random
kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
- kinesisTestUtilsClz = \
- self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
- kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+ kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils()
try:
kinesisTestUtils.createStream()
aWSCredentials = kinesisTestUtils.getAWSCredentials()