aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/context.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/context.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/context.py')
-rw-r--r--python/pyspark/context.py42
1 files changed, 28 insertions, 14 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 78dccc4047..1dc2fec0ae 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -15,6 +15,8 @@
# limitations under the License.
#
+from __future__ import print_function
+
import os
import shutil
import sys
@@ -32,11 +34,14 @@ from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer
from pyspark.storagelevel import StorageLevel
-from pyspark.rdd import RDD, _load_from_socket
+from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
+if sys.version > '3':
+ xrange = range
+
__all__ = ['SparkContext']
@@ -133,7 +138,7 @@ class SparkContext(object):
if sparkHome:
self._conf.setSparkHome(sparkHome)
if environment:
- for key, value in environment.iteritems():
+ for key, value in environment.items():
self._conf.setExecutorEnv(key, value)
for key, value in DEFAULT_CONFIGS.items():
self._conf.setIfMissing(key, value)
@@ -153,6 +158,10 @@ class SparkContext(object):
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
self.environment[varName] = v
+ if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+ # disable randomness of hash of string in worker, if this is not
+ # launched by spark-submit
+ self.environment["PYTHONHASHSEED"] = "0"
# Create the Java SparkContext through Py4J
self._jsc = jsc or self._initialize_context(self._conf._jconf)
@@ -323,7 +332,7 @@ class SparkContext(object):
start0 = c[0]
def getStart(split):
- return start0 + (split * size / numSlices) * step
+ return start0 + int((split * size / numSlices)) * step
def f(split, iterator):
return xrange(getStart(split), getStart(split + 1), step)
@@ -357,6 +366,7 @@ class SparkContext(object):
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.objectFile(name, minPartitions), self)
+ @ignore_unicode_prefix
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
@@ -369,7 +379,7 @@ class SparkContext(object):
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("Hello world!")
+ ... _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']
@@ -378,6 +388,7 @@ class SparkContext(object):
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer(use_unicode))
+ @ignore_unicode_prefix
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
Read a directory of text files from HDFS, a local file system
@@ -411,9 +422,9 @@ class SparkContext(object):
>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
- ... file1.write("1")
+ ... _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
- ... file2.write("2")
+ ... _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
@@ -456,7 +467,7 @@ class SparkContext(object):
jm = self._jvm.java.util.HashMap()
if not d:
d = {}
- for k, v in d.iteritems():
+ for k, v in d.items():
jm[k] = v
return jm
@@ -608,6 +619,7 @@ class SparkContext(object):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
+ @ignore_unicode_prefix
def union(self, rdds):
"""
Build the union of a list of RDDs.
@@ -618,7 +630,7 @@ class SparkContext(object):
>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("Hello")
+ ... _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
@@ -677,7 +689,7 @@ class SparkContext(object):
>>> from pyspark import SparkFiles
>>> path = os.path.join(tempdir, "test.txt")
>>> with open(path, "w") as testFile:
- ... testFile.write("100")
+ ... _ = testFile.write("100")
>>> sc.addFile(path)
>>> def func(iterator):
... with open(SparkFiles.get("test.txt")) as testFile:
@@ -705,11 +717,13 @@ class SparkContext(object):
"""
self.addFile(path)
(dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
-
if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
# for tests in local mode
sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+ if sys.version > '3':
+ import importlib
+ importlib.invalidate_caches()
def setCheckpointDir(self, dirName):
"""
@@ -744,7 +758,7 @@ class SparkContext(object):
The application can use L{SparkContext.cancelJobGroup} to cancel all
running jobs in this group.
- >>> import thread, threading
+ >>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
@@ -763,10 +777,10 @@ class SparkContext(object):
... sleep(5)
... sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
- >>> supress = thread.start_new_thread(start_job, (10,))
- >>> supress = thread.start_new_thread(stop_job, tuple())
+ >>> supress = threading.Thread(target=start_job, args=(10,)).start()
+ >>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
- >>> print result
+ >>> print(result)
Cancelled
If interruptOnCancel is set to true for the job group, then job cancellation will result