aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/daemon.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-12 18:42:50 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-12 18:42:50 -0700
commit71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (patch)
tree1cff9839428a177b129de830c3c175b1316a6626 /python/pyspark/daemon.py
parent25311c2c545a60eb9dcf704814d4600987852155 (diff)
downloadspark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.gz
spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.bz2
spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.zip
[SPARK-3094] [PySpark] compatitable with PyPy
After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example: ``` PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py ``` The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks: Job | CPython 2.7 | PyPy 2.3.1 | Speed up ------- | ------------ | ------------- | ------- Word Count | 41s | 15s | 2.7x Sort | 46s | 44s | 1.05x Stats | 174s | 3.6s | 48x Here is the code used for benchmark: ```python rdd = sc.textFile("text") def wordcount(): rdd.flatMap(lambda x:x.split('/'))\ .map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap() def sort(): rdd.sortBy(lambda x:x, 1).count() def stats(): sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats() ``` Author: Davies Liu <davies.liu@gmail.com> Closes #2144 from davies/pypy and squashes the following commits: 9aed6c5 [Davies Liu] use protocol 2 in CloudPickle 4bc1f04 [Davies Liu] refactor b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way 3ca2351 [Davies Liu] Merge branch 'master' into pypy fae8b19 [Davies Liu] improve attrgetter, add tests 591f830 [Davies Liu] try to run tests with PyPy in run-tests c8d62ba [Davies Liu] cleanup f651fd0 [Davies Liu] fix tests using array with PyPy 1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways 3c1dbfe [Davies Liu] Merge branch 'master' into pypy 42fb5fa [Davies Liu] Merge branch 'master' into pypy cb2d724 [Davies Liu] fix tests 9986692 [Davies Liu] Merge branch 'master' into pypy 25b4ca7 [Davies Liu] support PyPy
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r--python/pyspark/daemon.py6
1 files changed, 1 insertions, 5 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index 22ab8d30c0..15445abf67 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -42,10 +42,6 @@ def worker(sock):
"""
Called by a worker process after the fork().
"""
- # Redirect stdout to stderr
- os.dup2(2, 1)
- sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1
-
signal.signal(SIGHUP, SIG_DFL)
signal.signal(SIGCHLD, SIG_DFL)
signal.signal(SIGTERM, SIG_DFL)
@@ -102,6 +98,7 @@ def manager():
listen_sock.listen(max(1024, SOMAXCONN))
listen_host, listen_port = listen_sock.getsockname()
write_int(listen_port, sys.stdout)
+ sys.stdout.flush()
def shutdown(code):
signal.signal(SIGTERM, SIG_DFL)
@@ -115,7 +112,6 @@ def manager():
signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP
# Initialization complete
- sys.stdout.close()
try:
while True:
try: