diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 10 |
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 680140d72d..8ed89e2f97 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -15,7 +15,6 @@ # limitations under the License. # -from base64 import standard_b64encode as b64enc import copy from collections import defaultdict from itertools import chain, ifilter, imap @@ -32,6 +31,7 @@ import bisect from random import Random from math import sqrt, log, isinf, isnan +from pyspark.accumulators import PStatsParam from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ PickleSerializer, pack_long, AutoBatchedSerializer @@ -2080,7 +2080,9 @@ class PipelinedRDD(RDD): return self._jrdd_val if self._bypass_serializer: self._jrdd_deserializer = NoOpSerializer() - command = (self.func, self._prev_jrdd_deserializer, + enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true" + profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None + command = (self.func, profileStats, self._prev_jrdd_deserializer, self._jrdd_deserializer) # the serialized command will be compressed by broadcast ser = CloudPickleSerializer() @@ -2102,6 +2104,10 @@ class PipelinedRDD(RDD): self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator) self._jrdd_val = python_rdd.asJavaRDD() + + if enable_profile: + self._id = self._jrdd_val.id() + self.ctx._add_profile(self._id, profileStats) return self._jrdd_val def id(self): |