From 07cb323e7a128b87ef265ddc66f033365d9de463 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 14 Mar 2016 12:22:02 -0700 Subject: [SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classloading issue This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark. In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches. Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2 /cc zsxwing tdas davies brkyvz Author: Josh Rosen Closes #11687 from JoshRosen/py4j-0.9.2. --- LICENSE | 2 +- bin/pyspark | 2 +- bin/pyspark2.cmd | 2 +- core/pom.xml | 2 +- .../org/apache/spark/api/python/PythonUtils.scala | 2 +- dev/deps/spark-deps-hadoop-2.2 | 2 +- dev/deps/spark-deps-hadoop-2.3 | 2 +- dev/deps/spark-deps-hadoop-2.4 | 2 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- .../apache/spark/streaming/kafka/KafkaUtils.scala | 2 +- python/docs/Makefile | 2 +- python/lib/py4j-0.9.1-src.zip | Bin 47035 -> 0 bytes python/lib/py4j-0.9.2-src.zip | Bin 0 -> 55521 bytes python/pyspark/streaming/flume.py | 9 +++----- python/pyspark/streaming/kafka.py | 10 +++------ python/pyspark/streaming/kinesis.py | 14 +++++------- python/pyspark/streaming/mqtt.py | 13 +++++------ python/pyspark/streaming/tests.py | 25 +++++---------------- sbin/spark-config.sh | 2 +- .../org/apache/spark/deploy/yarn/Client.scala | 4 ++-- .../spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 22 files changed, 38 insertions(+), 65 deletions(-) delete mode 100644 python/lib/py4j-0.9.1-src.zip create mode 100644 python/lib/py4j-0.9.2-src.zip diff --git a/LICENSE b/LICENSE index 3c6117f4aa..d7a790a628 100644 --- a/LICENSE +++ b/LICENSE @@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt. (New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf) (The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net) (The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net) - (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/) + (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.2 - http://py4j.sourceforge.net/) (Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/) (BSD licence) sbt and sbt-launch-lib.bash (BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE) diff --git a/bin/pyspark b/bin/pyspark index 2ac4a8be25..6962f4577d 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -67,7 +67,7 @@ export PYSPARK_PYTHON # Add the PySpark classes to the Python path: export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH" -export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:$PYTHONPATH" # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP="$PYTHONSTARTUP" diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 21fe28155a..cb788497ff 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" ( ) set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH% -set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH% +set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.2-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py diff --git a/core/pom.xml b/core/pom.xml index be40d9936a..4c7e3a3662 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -314,7 +314,7 @@ net.sf.py4j py4j - 0.9.1 + 0.9.2 org.apache.spark diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index bda872746c..8bcd2903fe 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -32,7 +32,7 @@ private[spark] object PythonUtils { val pythonPath = new ArrayBuffer[String] for (sparkHome <- sys.env.get("SPARK_HOME")) { pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator) - pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator) + pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.2-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) pythonPath.mkString(File.pathSeparator) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 9991a3be36..512675a599 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -153,7 +153,7 @@ pmml-agent-1.2.7.jar pmml-model-1.2.7.jar pmml-schema-1.2.7.jar protobuf-java-2.5.0.jar -py4j-0.9.1.jar +py4j-0.9.2.jar pyrolite-4.9.jar reflectasm-1.07-shaded.jar scala-compiler-2.11.7.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index c52af73b4e..31f8694fed 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -144,7 +144,7 @@ pmml-agent-1.2.7.jar pmml-model-1.2.7.jar pmml-schema-1.2.7.jar protobuf-java-2.5.0.jar -py4j-0.9.1.jar +py4j-0.9.2.jar pyrolite-4.9.jar reflectasm-1.07-shaded.jar scala-compiler-2.11.7.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 092a090026..0fa8bccab0 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -145,7 +145,7 @@ pmml-agent-1.2.7.jar pmml-model-1.2.7.jar pmml-schema-1.2.7.jar protobuf-java-2.5.0.jar -py4j-0.9.1.jar +py4j-0.9.2.jar pyrolite-4.9.jar reflectasm-1.07-shaded.jar scala-compiler-2.11.7.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 81d86ee3d4..8d2f6e6e32 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar pmml-model-1.2.7.jar pmml-schema-1.2.7.jar protobuf-java-2.5.0.jar -py4j-0.9.1.jar +py4j-0.9.2.jar pyrolite-4.9.jar reflectasm-1.07-shaded.jar scala-compiler-2.11.7.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index e38ad1e240..a114c4ae8d 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar pmml-model-1.2.7.jar pmml-schema-1.2.7.jar protobuf-java-2.5.0.jar -py4j-0.9.1.jar +py4j-0.9.2.jar pyrolite-4.9.jar reflectasm-1.07-shaded.jar scala-compiler-2.11.7.jar diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 72d9053355..edaafb912c 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -615,7 +615,7 @@ object KafkaUtils { /** * This is a helper class that wraps the KafkaUtils.createStream() into more * Python-friendly class and function so that it can be easily - * instantiated and called from Python's KafkaUtils (see SPARK-6027). + * instantiated and called from Python's KafkaUtils. * * The zero-arg constructor helps instantiate this class from the Class object * classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream() diff --git a/python/docs/Makefile b/python/docs/Makefile index b6d24d8599..903009790b 100644 --- a/python/docs/Makefile +++ b/python/docs/Makefile @@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build PAPER = BUILDDIR = _build -export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip) +export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.2-src.zip) # User-friendly check for sphinx-build ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1) diff --git a/python/lib/py4j-0.9.1-src.zip b/python/lib/py4j-0.9.1-src.zip deleted file mode 100644 index fedde845fd..0000000000 Binary files a/python/lib/py4j-0.9.1-src.zip and /dev/null differ diff --git a/python/lib/py4j-0.9.2-src.zip b/python/lib/py4j-0.9.2-src.zip new file mode 100644 index 0000000000..881bb759d7 Binary files /dev/null and b/python/lib/py4j-0.9.2-src.zip differ diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index edd5886a85..cd30483fc6 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -111,12 +111,9 @@ class FlumeUtils(object): @staticmethod def _get_helper(sc): try: - helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper") - return helperClass.newInstance() - except Py4JJavaError as e: - # TODO: use --jar once it also work on driver - if 'ClassNotFoundException' in str(e.java_exception): + return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": FlumeUtils._printErrorMsg(sc) raise diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index a70b99249d..02a88699a2 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -192,13 +192,9 @@ class KafkaUtils(object): @staticmethod def _get_helper(sc): try: - # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027) - helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") - return helperClass.newInstance() - except Py4JJavaError as e: - # TODO: use --jar once it also work on driver - if 'ClassNotFoundException' in str(e.java_exception): + return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": KafkaUtils._printErrorMsg(sc) raise diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py index e681301681..434ce83e1e 100644 --- a/python/pyspark/streaming/kinesis.py +++ b/python/pyspark/streaming/kinesis.py @@ -74,16 +74,14 @@ class KinesisUtils(object): try: # Use KinesisUtilsPythonHelper to access Scala's KinesisUtils - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ - .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper") - helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, - regionName, initialPositionInStream, jduration, jlevel, - awsAccessKeyId, awsSecretKey) - except Py4JJavaError as e: - if 'ClassNotFoundException' in str(e.java_exception): + helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": KinesisUtils._printErrorMsg(ssc.sparkContext) raise + jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl, + regionName, initialPositionInStream, jduration, jlevel, + awsAccessKeyId, awsSecretKey) stream = DStream(jstream, ssc, NoOpSerializer()) return stream.map(lambda v: decoder(v)) diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py index 388e9526ba..8848a70c75 100644 --- a/python/pyspark/streaming/mqtt.py +++ b/python/pyspark/streaming/mqtt.py @@ -38,18 +38,15 @@ class MQTTUtils(object): :param storageLevel: RDD storage level. :return: A DStream object """ - jlevel = ssc._sc._getJavaStorageLevel(storageLevel) - try: - helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper") - helper = helperClass.newInstance() - jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) - except Py4JJavaError as e: - if 'ClassNotFoundException' in str(e.java_exception): + helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper() + except TypeError as e: + if str(e) == "'JavaPackage' object is not callable": MQTTUtils._printErrorMsg(ssc.sparkContext) raise + jlevel = ssc._sc._getJavaStorageLevel(storageLevel) + jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel) return DStream(jstream, ssc, UTF8Deserializer()) @staticmethod diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 469c068134..f4bbb1b128 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase): def setUp(self): super(KafkaStreamTests, self).setUp() - - kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ - .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") - self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() + self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils() self._kafkaTestUtils.setup() def tearDown(self): @@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase): def setUp(self): super(FlumeStreamTests, self).setUp() - - utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase): maxAttempts = 5 def setUp(self): - utilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils") - self._utils = utilsClz.newInstance() + self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils() def tearDown(self): if self._utils is not None: @@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase): def setUp(self): super(MQTTStreamTests, self).setUp() - - MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils") - self._MQTTTestUtils = MQTTTestUtilsClz.newInstance() + self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils() self._MQTTTestUtils.setup() def tearDown(self): @@ -1498,10 +1486,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): import random kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000))) - kinesisTestUtilsClz = \ - self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ - .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils") - kinesisTestUtils = kinesisTestUtilsClz.newInstance() + kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils() try: kinesisTestUtils.createStream() aWSCredentials = kinesisTestUtils.getAWSCredentials() diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh index 0c37985a67..97df433a0b 100755 --- a/sbin/spark-config.sh +++ b/sbin/spark-config.sh @@ -27,4 +27,4 @@ fi export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}" # Add the PySpark classes to the PYTHONPATH: export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}" -export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}" +export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:${PYTHONPATH}" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1035056457..40cf9b68ed 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1087,9 +1087,9 @@ private[spark] class Client( val pyArchivesFile = new File(pyLibPath, "pyspark.zip") require(pyArchivesFile.exists(), "pyspark.zip not found; cannot run pyspark application in YARN mode.") - val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip") + val py4jFile = new File(pyLibPath, "py4j-0.9.2-src.zip") require(py4jFile.exists(), - "py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.") + "py4j-0.9.2-src.zip not found; cannot run pyspark application in YARN mode.") Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath()) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 5068c0cd20..8a92a7ecda 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -154,7 +154,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite { // needed locations. val sparkHome = sys.props("spark.test.home") val pythonPath = Seq( - s"$sparkHome/python/lib/py4j-0.9.1-src.zip", + s"$sparkHome/python/lib/py4j-0.9.2-src.zip", s"$sparkHome/python") val extraEnv = Map( "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), -- cgit v1.2.3