aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2016-03-14 12:22:02 -0700
committerJosh Rosen <joshrosen@databricks.com>2016-03-14 12:22:02 -0700
commit07cb323e7a128b87ef265ddc66f033365d9de463 (patch)
tree41de1d7201b8592ffd8742aa09a98ee1beb207e6
parent6a4bfcd62b7effcfbb865bdd301d41a0ba6e5c94 (diff)
downloadspark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.gz
spark-07cb323e7a128b87ef265ddc66f033365d9de463.tar.bz2
spark-07cb323e7a128b87ef265ddc66f033365d9de463.zip
[SPARK-13848][SPARK-5185] Update to Py4J 0.9.2 in order to fix classloading issue
This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark. In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches. Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2 /cc zsxwing tdas davies brkyvz Author: Josh Rosen <joshrosen@databricks.com> Closes #11687 from JoshRosen/py4j-0.9.2.
-rw-r--r--LICENSE2
-rwxr-xr-xbin/pyspark2
-rw-r--r--bin/pyspark2.cmd2
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala2
-rw-r--r--dev/deps/spark-deps-hadoop-2.22
-rw-r--r--dev/deps/spark-deps-hadoop-2.32
-rw-r--r--dev/deps/spark-deps-hadoop-2.42
-rw-r--r--dev/deps/spark-deps-hadoop-2.62
-rw-r--r--dev/deps/spark-deps-hadoop-2.72
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala2
-rw-r--r--python/docs/Makefile2
-rw-r--r--python/lib/py4j-0.9.2-src.zip (renamed from python/lib/py4j-0.9.1-src.zip)bin47035 -> 55521 bytes
-rw-r--r--python/pyspark/streaming/flume.py9
-rw-r--r--python/pyspark/streaming/kafka.py10
-rw-r--r--python/pyspark/streaming/kinesis.py14
-rw-r--r--python/pyspark/streaming/mqtt.py13
-rw-r--r--python/pyspark/streaming/tests.py25
-rwxr-xr-xsbin/spark-config.sh2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala2
21 files changed, 38 insertions, 65 deletions
diff --git a/LICENSE b/LICENSE
index 3c6117f4aa..d7a790a628 100644
--- a/LICENSE
+++ b/LICENSE
@@ -263,7 +263,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
- (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
+ (The New BSD License) Py4J (net.sf.py4j:py4j:0.9.2 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
diff --git a/bin/pyspark b/bin/pyspark
index 2ac4a8be25..6962f4577d 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -67,7 +67,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:$PYTHONPATH"
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 21fe28155a..cb788497ff 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
-set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%
+set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.2-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index be40d9936a..4c7e3a3662 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -314,7 +314,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
- <version>0.9.1</version>
+ <version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index bda872746c..8bcd2903fe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
- pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
+ pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.2-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 9991a3be36..512675a599 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -153,7 +153,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.1.jar
+py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index c52af73b4e..31f8694fed 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -144,7 +144,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.1.jar
+py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 092a090026..0fa8bccab0 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -145,7 +145,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.1.jar
+py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 81d86ee3d4..8d2f6e6e32 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -151,7 +151,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.1.jar
+py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index e38ad1e240..a114c4ae8d 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -152,7 +152,7 @@ pmml-agent-1.2.7.jar
pmml-model-1.2.7.jar
pmml-schema-1.2.7.jar
protobuf-java-2.5.0.jar
-py4j-0.9.1.jar
+py4j-0.9.2.jar
pyrolite-4.9.jar
reflectasm-1.07-shaded.jar
scala-compiler-2.11.7.jar
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index 72d9053355..edaafb912c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -615,7 +615,7 @@ object KafkaUtils {
/**
* This is a helper class that wraps the KafkaUtils.createStream() into more
* Python-friendly class and function so that it can be easily
- * instantiated and called from Python's KafkaUtils (see SPARK-6027).
+ * instantiated and called from Python's KafkaUtils.
*
* The zero-arg constructor helps instantiate this class from the Class object
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
diff --git a/python/docs/Makefile b/python/docs/Makefile
index b6d24d8599..903009790b 100644
--- a/python/docs/Makefile
+++ b/python/docs/Makefile
@@ -7,7 +7,7 @@ SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
-export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.1-src.zip)
+export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.9.2-src.zip)
# User-friendly check for sphinx-build
ifeq ($(shell which $(SPHINXBUILD) >/dev/null 2>&1; echo $$?), 1)
diff --git a/python/lib/py4j-0.9.1-src.zip b/python/lib/py4j-0.9.2-src.zip
index fedde845fd..881bb759d7 100644
--- a/python/lib/py4j-0.9.1-src.zip
+++ b/python/lib/py4j-0.9.2-src.zip
Binary files differ
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index edd5886a85..cd30483fc6 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -111,12 +111,9 @@ class FlumeUtils(object):
@staticmethod
def _get_helper(sc):
try:
- helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.FlumeUtilsPythonHelper")
- return helperClass.newInstance()
- except Py4JJavaError as e:
- # TODO: use --jar once it also work on driver
- if 'ClassNotFoundException' in str(e.java_exception):
+ return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
FlumeUtils._printErrorMsg(sc)
raise
diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py
index a70b99249d..02a88699a2 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -192,13 +192,9 @@ class KafkaUtils(object):
@staticmethod
def _get_helper(sc):
try:
- # Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
- helperClass = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
- return helperClass.newInstance()
- except Py4JJavaError as e:
- # TODO: use --jar once it also work on driver
- if 'ClassNotFoundException' in str(e.java_exception):
+ return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
KafkaUtils._printErrorMsg(sc)
raise
diff --git a/python/pyspark/streaming/kinesis.py b/python/pyspark/streaming/kinesis.py
index e681301681..434ce83e1e 100644
--- a/python/pyspark/streaming/kinesis.py
+++ b/python/pyspark/streaming/kinesis.py
@@ -74,16 +74,14 @@ class KinesisUtils(object):
try:
# Use KinesisUtilsPythonHelper to access Scala's KinesisUtils
- helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
- .loadClass("org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper")
- helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
- regionName, initialPositionInStream, jduration, jlevel,
- awsAccessKeyId, awsSecretKey)
- except Py4JJavaError as e:
- if 'ClassNotFoundException' in str(e.java_exception):
+ helper = ssc._jvm.org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
KinesisUtils._printErrorMsg(ssc.sparkContext)
raise
+ jstream = helper.createStream(ssc._jssc, kinesisAppName, streamName, endpointUrl,
+ regionName, initialPositionInStream, jduration, jlevel,
+ awsAccessKeyId, awsSecretKey)
stream = DStream(jstream, ssc, NoOpSerializer())
return stream.map(lambda v: decoder(v))
diff --git a/python/pyspark/streaming/mqtt.py b/python/pyspark/streaming/mqtt.py
index 388e9526ba..8848a70c75 100644
--- a/python/pyspark/streaming/mqtt.py
+++ b/python/pyspark/streaming/mqtt.py
@@ -38,18 +38,15 @@ class MQTTUtils(object):
:param storageLevel: RDD storage level.
:return: A DStream object
"""
- jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
-
try:
- helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper")
- helper = helperClass.newInstance()
- jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
- except Py4JJavaError as e:
- if 'ClassNotFoundException' in str(e.java_exception):
+ helper = ssc._jvm.org.apache.spark.streaming.mqtt.MQTTUtilsPythonHelper()
+ except TypeError as e:
+ if str(e) == "'JavaPackage' object is not callable":
MQTTUtils._printErrorMsg(ssc.sparkContext)
raise
+ jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
+ jstream = helper.createStream(ssc._jssc, brokerUrl, topic, jlevel)
return DStream(jstream, ssc, UTF8Deserializer())
@staticmethod
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 469c068134..f4bbb1b128 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1006,10 +1006,7 @@ class KafkaStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(KafkaStreamTests, self).setUp()
-
- kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
- .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
- self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
+ self._kafkaTestUtils = self.ssc._jvm.org.apache.spark.streaming.kafka.KafkaTestUtils()
self._kafkaTestUtils.setup()
def tearDown(self):
@@ -1271,10 +1268,7 @@ class FlumeStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(FlumeStreamTests, self).setUp()
-
- utilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.FlumeTestUtils")
- self._utils = utilsClz.newInstance()
+ self._utils = self.ssc._jvm.org.apache.spark.streaming.flume.FlumeTestUtils()
def tearDown(self):
if self._utils is not None:
@@ -1339,10 +1333,7 @@ class FlumePollingStreamTests(PySparkStreamingTestCase):
maxAttempts = 5
def setUp(self):
- utilsClz = \
- self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.flume.PollingFlumeTestUtils")
- self._utils = utilsClz.newInstance()
+ self._utils = self.sc._jvm.org.apache.spark.streaming.flume.PollingFlumeTestUtils()
def tearDown(self):
if self._utils is not None:
@@ -1419,10 +1410,7 @@ class MQTTStreamTests(PySparkStreamingTestCase):
def setUp(self):
super(MQTTStreamTests, self).setUp()
-
- MQTTTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.mqtt.MQTTTestUtils")
- self._MQTTTestUtils = MQTTTestUtilsClz.newInstance()
+ self._MQTTTestUtils = self.ssc._jvm.org.apache.spark.streaming.mqtt.MQTTTestUtils()
self._MQTTTestUtils.setup()
def tearDown(self):
@@ -1498,10 +1486,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
import random
kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
- kinesisTestUtilsClz = \
- self.sc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
- .loadClass("org.apache.spark.streaming.kinesis.KinesisTestUtils")
- kinesisTestUtils = kinesisTestUtilsClz.newInstance()
+ kinesisTestUtils = self.ssc._jvm.org.apache.spark.streaming.kinesis.KinesisTestUtils()
try:
kinesisTestUtils.createStream()
aWSCredentials = kinesisTestUtils.getAWSCredentials()
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index 0c37985a67..97df433a0b 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -27,4 +27,4 @@ fi
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
-export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:${PYTHONPATH}"
+export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.2-src.zip:${PYTHONPATH}"
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1035056457..40cf9b68ed 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1087,9 +1087,9 @@ private[spark] class Client(
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
require(pyArchivesFile.exists(),
"pyspark.zip not found; cannot run pyspark application in YARN mode.")
- val py4jFile = new File(pyLibPath, "py4j-0.9.1-src.zip")
+ val py4jFile = new File(pyLibPath, "py4j-0.9.2-src.zip")
require(py4jFile.exists(),
- "py4j-0.9.1-src.zip not found; cannot run pyspark application in YARN mode.")
+ "py4j-0.9.2-src.zip not found; cannot run pyspark application in YARN mode.")
Seq(pyArchivesFile.getAbsolutePath(), py4jFile.getAbsolutePath())
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 5068c0cd20..8a92a7ecda 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -154,7 +154,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
// needed locations.
val sparkHome = sys.props("spark.test.home")
val pythonPath = Seq(
- s"$sparkHome/python/lib/py4j-0.9.1-src.zip",
+ s"$sparkHome/python/lib/py4j-0.9.2-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),