From 3bead67d5926a2a798ca0e2bc71e747380493787 Mon Sep 17 00:00:00 2001 From: Yandu Oppacher Date: Wed, 28 Jan 2015 13:48:06 -0800 Subject: [SPARK-4387][PySpark] Refactoring python profiling code to make it extensible This PR is based on #3255 , fix conflicts and code style. Closes #3255. Author: Yandu Oppacher Author: Davies Liu Closes #3901 from davies/refactor-python-profile-code and squashes the following commits: b4a9306 [Davies Liu] fix tests 4b79ce8 [Davies Liu] add docstring for profiler_cls 2700e47 [Davies Liu] use BasicProfiler as default 349e341 [Davies Liu] more refactor 6a5d4df [Davies Liu] refactor and fix tests 31bf6b6 [Davies Liu] fix code style 0864b5d [Yandu Oppacher] Remove unused method 76a6c37 [Yandu Oppacher] Added a profile collector to accumulate the profilers per stage 9eefc36 [Yandu Oppacher] Fix doc 9ace076 [Yandu Oppacher] Refactor of profiler, and moved tests around 8739aff [Yandu Oppacher] Code review fixes 9bda3ec [Yandu Oppacher] Refactor profiler code --- python/pyspark/worker.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) (limited to 'python/pyspark/worker.py') diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 7e5343c973..8a93c320ec 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -23,8 +23,6 @@ import sys import time import socket import traceback -import cProfile -import pstats from pyspark.accumulators import _accumulatorRegistry from pyspark.broadcast import Broadcast, _broadcastRegistry @@ -90,19 +88,15 @@ def main(infile, outfile): command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) - (func, stats, deserializer, serializer) = command + (func, profiler, deserializer, serializer) = command init_time = time.time() def process(): iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) - if stats: - p = cProfile.Profile() - p.runcall(process) - st = pstats.Stats(p) - st.stream = None # make it picklable - stats.add(st.strip_dirs()) + if profiler: + profiler.profile(process) else: process() except Exception: -- cgit v1.2.3