aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/daemon.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py36
1 files changed, 14 insertions, 22 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 93885985fe..7f06d4288c 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -24,9 +24,10 @@ import sys
import traceback
import time
import gc
-from errno import EINTR, ECHILD, EAGAIN
+from errno import EINTR, EAGAIN
from socket import AF_INET, SOCK_STREAM, SOMAXCONN
from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+
from pyspark.worker import main as worker_main
from pyspark.serializers import read_int, write_int
@@ -53,8 +54,8 @@ def worker(sock):
# Read the socket using fdopen instead of socket.makefile() because the latter
# seems to be very slow; note that we need to dup() the file descriptor because
# otherwise writes also cause a seek that makes us miss data on the read side.
- infile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
- outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
+ infile = os.fdopen(os.dup(sock.fileno()), "rb", 65536)
+ outfile = os.fdopen(os.dup(sock.fileno()), "wb", 65536)
exit_code = 0
try:
worker_main(infile, outfile)
@@ -68,17 +69,6 @@ def worker(sock):
return exit_code
-# Cleanup zombie children
-def cleanup_dead_children():
- try:
- while True:
- pid, _ = os.waitpid(0, os.WNOHANG)
- if not pid:
- break
- except:
- pass
-
-
def manager():
# Create a new process group to corral our children
os.setpgid(0, 0)
@@ -88,8 +78,12 @@ def manager():
listen_sock.bind(('127.0.0.1', 0))
listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()
- write_int(listen_port, sys.stdout)
- sys.stdout.flush()
+
+ # re-open stdin/stdout in 'wb' mode
+ stdin_bin = os.fdopen(sys.stdin.fileno(), 'rb', 4)
+ stdout_bin = os.fdopen(sys.stdout.fileno(), 'wb', 4)
+ write_int(listen_port, stdout_bin)
+ stdout_bin.flush()
def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
@@ -101,6 +95,7 @@ def manager():
shutdown(1)
signal.signal(SIGTERM, handle_sigterm) # Gracefully exit on SIGTERM
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
+ signal.signal(SIGCHLD, SIG_IGN)
reuse = os.environ.get("SPARK_REUSE_WORKER")
@@ -115,12 +110,9 @@ def manager():
else:
raise
- # cleanup in signal handler will cause deadlock
- cleanup_dead_children()
-
if 0 in ready_fds:
try:
- worker_pid = read_int(sys.stdin)
+ worker_pid = read_int(stdin_bin)
except EOFError:
# Spark told us to exit by closing stdin
shutdown(0)
@@ -145,7 +137,7 @@ def manager():
time.sleep(1)
pid = os.fork() # error here will shutdown daemon
else:
- outfile = sock.makefile('w')
+ outfile = sock.makefile(mode='wb')
write_int(e.errno, outfile) # Signal that the fork failed
outfile.flush()
outfile.close()
@@ -157,7 +149,7 @@ def manager():
listen_sock.close()
try:
# Acknowledge that the fork was successful
- outfile = sock.makefile("w")
+ outfile = sock.makefile(mode="wb")
write_int(os.getpid(), outfile)
outfile.flush()
outfile.close()