aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py15
1 files changed, 9 insertions, 6 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 014c0aa889..b6dd5a3bf0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -31,7 +31,6 @@ import bisect
import random
from math import sqrt, log, isinf, isnan
-from pyspark.accumulators import PStatsParam
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
@@ -2132,9 +2131,13 @@ class PipelinedRDD(RDD):
return self._jrdd_val
if self._bypass_serializer:
self._jrdd_deserializer = NoOpSerializer()
- enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true"
- profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None
- command = (self.func, profileStats, self._prev_jrdd_deserializer,
+
+ if self.ctx.profiler_collector:
+ profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
+ else:
+ profiler = None
+
+ command = (self.func, profiler, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
@@ -2157,9 +2160,9 @@ class PipelinedRDD(RDD):
broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()
- if enable_profile:
+ if profiler:
self._id = self._jrdd_val.id()
- self.ctx._add_profile(self._id, profileStats)
+ self.ctx.profiler_collector.add_profiler(self._id, profiler)
return self._jrdd_val
def id(self):