aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
authorDavies Liu <davies.liu@gmail.com>2014-09-26 09:27:42 -0700
committerJosh Rosen <joshrosen@apache.org>2014-09-26 09:27:42 -0700
commit1aa549ba9839565274a12c52fa1075b424f138a6 (patch)
tree101560e54ca47299aa273728821e79f5340b4f92 /python/pyspark/worker.py
parentb235e013638685758885842dc3268e9800af3678 (diff)
downloadspark-1aa549ba9839565274a12c52fa1075b424f138a6.tar.gz
spark-1aa549ba9839565274a12c52fa1075b424f138a6.tar.bz2
spark-1aa549ba9839565274a12c52fa1075b424f138a6.zip
[SPARK-3478] [PySpark] Profile the Python tasks
This patch add profiling support for PySpark, it will show the profiling results before the driver exits, here is one example: ``` ============================================================ Profile of RDD<id=3> ============================================================ 5146507 function calls (5146487 primitive calls) in 71.094 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 5144576 68.331 0.000 68.331 0.000 statcounter.py:44(merge) 20 2.735 0.137 71.071 3.554 statcounter.py:33(__init__) 20 0.017 0.001 0.017 0.001 {cPickle.dumps} 1024 0.003 0.000 0.003 0.000 t.py:16(<lambda>) 20 0.001 0.000 0.001 0.000 {reduce} 21 0.001 0.000 0.001 0.000 {cPickle.loads} 20 0.001 0.000 0.001 0.000 copy_reg.py:95(_slotnames) 41 0.001 0.000 0.001 0.000 serializers.py:461(read_int) 40 0.001 0.000 0.002 0.000 serializers.py:179(_batched) 62 0.000 0.000 0.000 0.000 {method 'read' of 'file' objects} 20 0.000 0.000 71.072 3.554 rdd.py:863(<lambda>) 20 0.000 0.000 0.001 0.000 serializers.py:198(load_stream) 40/20 0.000 0.000 71.072 3.554 rdd.py:2093(pipeline_func) 41 0.000 0.000 0.002 0.000 serializers.py:130(load_stream) 40 0.000 0.000 71.072 1.777 rdd.py:304(func) 20 0.000 0.000 71.094 3.555 worker.py:82(process) ``` Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk by `sc.dump_profiles(path)`, such as ```python >>> sc._conf.set("spark.python.profile", "true") >>> rdd = sc.parallelize(range(100)).map(str) >>> rdd.count() 100 >>> sc.show_profiles() ============================================================ Profile of RDD<id=1> ============================================================ 284 function calls (276 primitive calls) in 0.001 seconds Ordered by: internal time, cumulative time ncalls tottime percall cumtime percall filename:lineno(function) 4 0.000 0.000 0.000 0.000 serializers.py:198(load_stream) 4 0.000 0.000 0.000 0.000 {reduce} 12/4 0.000 0.000 0.001 0.000 rdd.py:2092(pipeline_func) 4 0.000 0.000 0.000 0.000 {cPickle.loads} 4 0.000 0.000 0.000 0.000 {cPickle.dumps} 104 0.000 0.000 0.000 0.000 rdd.py:852(<genexpr>) 8 0.000 0.000 0.000 0.000 serializers.py:461(read_int) 12 0.000 0.000 0.000 0.000 rdd.py:303(func) ``` The profiling is disabled by default, can be enabled by "spark.python.profile=true". Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump" Author: Davies Liu <davies.liu@gmail.com> Closes #2351 from davies/profiler and squashes the following commits: 7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles() 2b0daf2 [Davies Liu] fix docs 7a56c24 [Davies Liu] bugfix cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler 116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler 09d02c3 [Davies Liu] Merge branch 'master' into profiler c23865c [Davies Liu] Merge branch 'master' into profiler 15d6f18 [Davies Liu] add docs for two configs dadee1a [Davies Liu] add docs string and clear profiles after show or dump 4f8309d [Davies Liu] address comment, add tests 0a5b6eb [Davies Liu] fix Python UDF 4b20494 [Davies Liu] add profile for python
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py19
1 files changed, 16 insertions, 3 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c1f6e3e4a1..8257dddfee 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,6 +23,8 @@ import sys
import time
import socket
import traceback
+import cProfile
+import pstats
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
@@ -90,10 +92,21 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
- (func, deserializer, serializer) = command
+ (func, stats, deserializer, serializer) = command
init_time = time.time()
- iterator = deserializer.load_stream(infile)
- serializer.dump_stream(func(split_index, iterator), outfile)
+
+ def process():
+ iterator = deserializer.load_stream(infile)
+ serializer.dump_stream(func(split_index, iterator), outfile)
+
+ if stats:
+ p = cProfile.Profile()
+ p.runcall(process)
+ st = pstats.Stats(p)
+ st.stream = None # make it picklable
+ stats.add(st.strip_dirs())
+ else:
+ process()
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)