diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-14 12:22:02 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-03-14 12:22:02 -0700 |
commit | 07cb323e7a128b87ef265ddc66f033365d9de463 (patch) | |
tree | 41de1d7201b8592ffd8742aa09a98ee1beb207e6 /python/pyspark/streaming/tests.py | |
parent | 6a4bfcd62b7effcfbb865bdd301d41a0ba6e5c94 (diff) | |
download | spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.gz spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.bz2 spark-07cb323e7a128b87ef265ddc66f033365d9de463.zip |
[SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classloading issue
This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark.
In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches.
Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2
/cc zsxwing tdas davies brkyvz
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11687 from JoshRosen/py4j-0.9.2.
Diffstat (limited to 'python/pyspark/streaming/tests.py')
-rw-r--r-- | python/pyspark/streaming/tests.py | 25 |
1 files changed, 5 insertions, 20 deletions
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 469c068134..f4bbb1b128 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): def setUp(self): super(KafkaStreamTests, self).setUp() - - kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ - .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") - self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() + self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils() self._kafkaTestUtils.setup() def tearDown(self): @@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase): def setUp(self): super(FlumeStreamTests, self).setUp() - - utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase): maxAttempts = 5 def setUp(self): - utilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase): def setUp(self): super(MQTTStreamTests, self).setUp() - - MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") - self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() + self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils() self._MQTTTestUtils.setup() def tearDown(self): @@ -1498,10 +1486,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): import random kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000))) - kinesisTestUtilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils") - kinesisTestUtils = kinesisTestUtilsClz.newInstance() + kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils() try: kinesisTestUtils.createStream() aWSCredentials = kinesisTestUtils.getAWSCredentials() |