aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/worker.py')
-rw-r--r--python/pyspark/worker.py19
1 files changed, 16 insertions, 3 deletions
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c1f6e3e4a1..8257dddfee 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -23,6 +23,8 @@ import sys
import time
import socket
import traceback
+import cProfile
+import pstats
from pyspark.accumulators import _accumulatorRegistry
from pyspark.broadcast import Broadcast, _broadcastRegistry
@@ -90,10 +92,21 @@ def main(infile, outfile):
command = pickleSer._read_with_length(infile)
if isinstance(command, Broadcast):
command = pickleSer.loads(command.value)
- (func, deserializer, serializer) = command
+ (func, stats, deserializer, serializer) = command
init_time = time.time()
- iterator = deserializer.load_stream(infile)
- serializer.dump_stream(func(split_index, iterator), outfile)
+
+ def process():
+ iterator = deserializer.load_stream(infile)
+ serializer.dump_stream(func(split_index, iterator), outfile)
+
+ if stats:
+ p = cProfile.Profile()
+ p.runcall(process)
+ st = pstats.Stats(p)
+ st.stream = None # make it picklable
+ stats.add(st.strip_dirs())
+ else:
+ process()
except Exception:
try:
write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)