diff options
author | Yandu Oppacher <yandu.oppacher@jadedpixel.com> | 2015-01-28 13:48:06 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-01-28 13:48:06 -0800 |
commit | 3bead67d5926a2a798ca0e2bc71e747380493787 (patch) | |
tree | ce36d8e926702f8da17b9f43c49e1682d1e9fccb /python/pyspark/rdd.py | |
parent | a731314c319a6f265060e05267844069027804fd (diff) | |
download | spark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.gz spark-3bead67d5926a2a798ca0e2bc71e747380493787.tar.bz2 spark-3bead67d5926a2a798ca0e2bc71e747380493787.zip |
[SPARK-4387][PySpark] Refactoring python profiling code to make it extensible
This PR is based on #3255 , fix conflicts and code style.
Closes #3255.
Author: Yandu Oppacher <yandu.oppacher@jadedpixel.com>
Author: Davies Liu <davies@databricks.com>
Closes #3901 from davies/refactor-python-profile-code and squashes the following commits:
b4a9306 [Davies Liu] fix tests
4b79ce8 [Davies Liu] add docstring for profiler_cls
2700e47 [Davies Liu] use BasicProfiler as default
349e341 [Davies Liu] more refactor
6a5d4df [Davies Liu] refactor and fix tests
31bf6b6 [Davies Liu] fix code style
0864b5d [Yandu Oppacher] Remove unused method
76a6c37 [Yandu Oppacher] Added a profile collector to accumulate the profilers per stage
9eefc36 [Yandu Oppacher] Fix doc
9ace076 [Yandu Oppacher] Refactor of profiler, and moved tests around
8739aff [Yandu Oppacher] Code review fixes
9bda3ec [Yandu Oppacher] Refactor profiler code
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 014c0aa889..b6dd5a3bf0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,7 +31,6 @@ import bisect import random from math import sqrt, log, isinf, isnan -from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2132,9 +2131,13 @@ class PipelinedRDD(RDD): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" - profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None - command = (self.func, profileStats, self._prev_jrdd_deserializer, + + if self.ctx.profiler_collector: + profiler = self.ctx.profiler_collector.new_profiler(self.ctx) + else: + profiler = None + + command = (self.func, profiler, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2157,9 +2160,9 @@ class PipelinedRDD(RDD): broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() - if enable_profile: + if profiler: self._id = self._jrdd_val.id() - self.ctx._add_profile(self._id, profileStats) + self.ctx.profiler_collector.add_profiler(self._id, profiler) return self._jrdd_val def id(self): |