aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py10
1 files changed, 8 insertions, 2 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 680140d72d..8ed89e2f97 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -15,7 +15,6 @@
# limitations under the License.
#
-from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
from itertools import chain, ifilter, imap
@@ -32,6 +31,7 @@ import bisect
from random import Random
from math import sqrt, log, isinf, isnan
+from pyspark.accumulators import PStatsParam
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
@@ -2080,7 +2080,9 @@ class PipelinedRDD(RDD):
return self._jrdd_val
if self._bypass_serializer:
self._jrdd_deserializer = NoOpSerializer()
- command = (self.func, self._prev_jrdd_deserializer,
+ enable_profile = self.ctx._conf.get("spark.python.profile", "false") == "true"
+ profileStats = self.ctx.accumulator(None, PStatsParam) if enable_profile else None
+ command = (self.func, profileStats, self._prev_jrdd_deserializer,
self._jrdd_deserializer)
# the serialized command will be compressed by broadcast
ser = CloudPickleSerializer()
@@ -2102,6 +2104,10 @@ class PipelinedRDD(RDD):
self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator)
self._jrdd_val = python_rdd.asJavaRDD()
+
+ if enable_profile:
+ self._id = self._jrdd_val.id()
+ self.ctx._add_profile(self._id, profileStats)
return self._jrdd_val
def id(self):