diff options
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r-- | python/pyspark/worker.py | 19 |
1 files changed, 16 insertions, 3 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c1f6e3e4a1..8257dddfee 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,6 +23,8 @@ import sys import time import socket import traceback +import cProfile +import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -90,10 +92,21 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, deserializer, serializer) = command + (func, stats, deserializer, serializer) = command init_time = time.time() - iterator = deserializer.load_stream(infile) - serializer.dump_stream(func(split_index, iterator), outfile) + + def process(): + iterator = deserializer.load_stream(infile) + serializer.dump_stream(func(split_index, iterator), outfile) + + if stats: + p = cProfile.Profile() + p.runcall(process) + st = pstats.Stats(p) + st.stream = None # make it picklable + stats.add(st.strip_dirs()) + else: + process() except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) |