diff options
author | Davies Liu <davies.liu@gmail.com> | 2014-09-12 18:42:50 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2014-09-12 18:42:50 -0700 |
commit | 71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7 (patch) | |
tree | 1cff9839428a177b129de830c3c175b1316a6626 /python/pyspark/daemon.py | |
parent | 25311c2c545a60eb9dcf704814d4600987852155 (diff) | |
download | spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.gz spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.tar.bz2 spark-71af030b46a89aaa9a87f18f56b9e1f1cd8ce2e7.zip |
[SPARK-3094] [PySpark] compatitable with PyPy
After this patch, we can run PySpark in PyPy (testing with PyPy 2.3.1 in Mac 10.9), for example:
```
PYSPARK_PYTHON=pypy ./bin/spark-submit wordcount.py
```
The performance speed up will depend on work load (from 20% to 3000%). Here are some benchmarks:
Job | CPython 2.7 | PyPy 2.3.1 | Speed up
------- | ------------ | ------------- | -------
Word Count | 41s | 15s | 2.7x
Sort | 46s | 44s | 1.05x
Stats | 174s | 3.6s | 48x
Here is the code used for benchmark:
```python
rdd = sc.textFile("text")
def wordcount():
rdd.flatMap(lambda x:x.split('/'))\
.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collectAsMap()
def sort():
rdd.sortBy(lambda x:x, 1).count()
def stats():
sc.parallelize(range(1024), 20).flatMap(lambda x: xrange(5024)).stats()
```
Author: Davies Liu <davies.liu@gmail.com>
Closes #2144 from davies/pypy and squashes the following commits:
9aed6c5 [Davies Liu] use protocol 2 in CloudPickle
4bc1f04 [Davies Liu] refactor
b20ab3a [Davies Liu] pickle sys.stdout and stderr in portable way
3ca2351 [Davies Liu] Merge branch 'master' into pypy
fae8b19 [Davies Liu] improve attrgetter, add tests
591f830 [Davies Liu] try to run tests with PyPy in run-tests
c8d62ba [Davies Liu] cleanup
f651fd0 [Davies Liu] fix tests using array with PyPy
1b98fb3 [Davies Liu] serialize itemgetter/attrgetter in portable ways
3c1dbfe [Davies Liu] Merge branch 'master' into pypy
42fb5fa [Davies Liu] Merge branch 'master' into pypy
cb2d724 [Davies Liu] fix tests
9986692 [Davies Liu] Merge branch 'master' into pypy
25b4ca7 [Davies Liu] support PyPy
Diffstat (limited to 'python/pyspark/daemon.py')
-rw-r--r-- | python/pyspark/daemon.py | 6 |
1 files changed, 1 insertions, 5 deletions
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 22ab8d30c0..15445abf67 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -42,10 +42,6 @@ def worker(sock): """ Called by a worker process after the fork(). """ - # Redirect stdout to stderr - os.dup2(2, 1) - sys.stdout = sys.stderr # The sys.stdout object is different from file descriptor 1 - signal.signal(SIGHUP, SIG_DFL) signal.signal(SIGCHLD, SIG_DFL) signal.signal(SIGTERM, SIG_DFL) @@ -102,6 +98,7 @@ def manager(): listen_sock.listen(max(1024, SOMAXCONN)) listen_host, listen_port = listen_sock.getsockname() write_int(listen_port, sys.stdout) + sys.stdout.flush() def shutdown(code): signal.signal(SIGTERM, SIG_DFL) @@ -115,7 +112,6 @@ def manager(): signal.signal(SIGHUP, SIG_IGN) # Don't die on SIGHUP # Initialization complete - sys.stdout.close() try: while True: try: |