aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/util.py')
-rw-r--r--python/pyspark/streaming/util.py7
1 files changed, 6 insertions, 1 deletions
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index a9bfec2aab..b20613b128 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -37,6 +37,11 @@ class TransformFunction(object):
self.ctx = ctx
self.func = func
self.deserializers = deserializers
+ self._rdd_wrapper = lambda jrdd, ctx, ser: RDD(jrdd, ctx, ser)
+
+ def rdd_wrapper(self, func):
+ self._rdd_wrapper = func
+ return self
def call(self, milliseconds, jrdds):
try:
@@ -51,7 +56,7 @@ class TransformFunction(object):
if len(sers) < len(jrdds):
sers += (sers[0],) * (len(jrdds) - len(sers))
- rdds = [RDD(jrdd, self.ctx, ser) if jrdd else None
+ rdds = [self._rdd_wrapper(jrdd, self.ctx, ser) if jrdd else None
for jrdd, ser in zip(jrdds, sers)]
t = datetime.fromtimestamp(milliseconds / 1000.0)
r = self.func(t, *rdds)