diff options
author | Josh Rosen <joshrosen@databricks.com> | 2016-03-14 12:22:02 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-03-14 12:22:02 -0700 |
commit | 07cb323e7a128b87ef265ddc66f033365d9de463 (patch) | |
tree | 41de1d7201b8592ffd8742aa09a98ee1beb207e6 /python/pyspark | |
parent | 6a4bfcd62b7effcfbb865bdd301d41a0ba6e5c94 (diff) | |
download | spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.gz spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.bz2 spark-07cb323e7a128b87ef265ddc66f033365d9de463.zip |
[SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classloading issue
This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark.
In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches.
Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2
/cc zsxwing tdas davies brkyvz
Author: Josh Rosen <joshrosen@databricks.com>
Closes #11687 from JoshRosen/py4j-0.9.2.
Diffstat (limited to 'python/pyspark')
-rw-r--r-- | python/pyspark/streaming/flume.py | 9 | ||||
-rw-r--r-- | python/pyspark/streaming/kafka.py | 10 | ||||
-rw-r--r-- | python/pyspark/streaming/kinesis.py | 14 | ||||
-rw-r--r-- | python/pyspark/streaming/mqtt.py | 13 | ||||
-rw-r--r-- | python/pyspark/streaming/tests.py | 25 |
5 files changed, 22 insertions, 49 deletions
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index edd5886a85..cd30483fc6 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -111,12 +111,9 @@ class FlumeUtils(object): @staticmethod def _get_helper(sc): try: - helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper") - return helperClass.newInstance() - except Py4JJavaError as e: - # TODO: use --jar once it also work on driver - if 'ClassNotFoundException' in str(e.java_exception): + return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": FlumeUtils._printErrorMsg(sc) raise diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index a70b99249d..02a88699a2 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -192,13 +192,9 @@ class KafkaUtils(object): @staticmethod def _get_helper(sc): try: - # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027) - helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") - return helperClass.newInstance() - except Py4JJavaError as e: - # TODO: use --jar once it also work on driver - if 'ClassNotFoundException' in str(e.java_exception): + return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": KafkaUtils._printErrorMsg(sc) raise diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index e681301681..434ce83e1e 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -74,16 +74,14 @@ class KinesisUtils(object): try: # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ - .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper") - helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, - regionName, initialPositionInStream, jduration, jlevel, - awsAccessKeyId, awsSecretKey) - except Py4JJavaError as e: - if 'ClassNotFoundException' in str(e.java_exception): + helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": KinesisUtils._printErrorMsg(ssc.sparkContext) raise + jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, + regionName, initialPositionInStream, jduration, jlevel, + awsAccessKeyId, awsSecretKey) stream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 388e9526ba..8848a70c75 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -38,18 +38,15 @@ class MQTTUtils(object): :param storageLevel: RDD storage level. :return: A DStream object """ - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - try: - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper") - helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JJavaError as e: - if 'ClassNotFoundException' in str(e.java_exception): + helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": MQTTUtils._printErrorMsg(ssc.sparkContext) raise + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) return DStream(jstream, ssc, UTF8Deserializer()) @staticmethod diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 469c068134..f4bbb1b128 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): def setUp(self): super(KafkaStreamTests, self).setUp() - - kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ - .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") - self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() + self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils() self._kafkaTestUtils.setup() def tearDown(self): @@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase): def setUp(self): super(FlumeStreamTests, self).setUp() - - utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase): maxAttempts = 5 def setUp(self): - utilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase): def setUp(self): super(MQTTStreamTests, self).setUp() - - MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") - self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() + self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils() self._MQTTTestUtils.setup() def tearDown(self): @@ -1498,10 +1486,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): import random kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000))) - kinesisTestUtilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils") - kinesisTestUtils = kinesisTestUtilsClz.newInstance() + kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils() try: kinesisTestUtils.createStream() aWSCredentials = kinesisTestUtils.getAWSCredentials() |