aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/dstream.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r--python/pyspark/streaming/dstream.py51
1 files changed, 28 insertions, 23 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 3fa4244423..ff097985fa 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -15,11 +15,15 @@
# limitations under the License.
#
-from itertools import chain, ifilter, imap
+import sys
import operator
import time
+from itertools import chain
from datetime import datetime
+if sys.version < "3":
+ from itertools import imap as map, ifilter as filter
+
from py4j.protocol import Py4JJavaError
from pyspark import RDD
@@ -76,7 +80,7 @@ class DStream(object):
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator):
- return ifilter(f, iterator)
+ return filter(f, iterator)
return self.mapPartitions(func, True)
def flatMap(self, f, preservesPartitioning=False):
@@ -85,7 +89,7 @@ class DStream(object):
this DStream, and then flattening the results
"""
def func(s, iterator):
- return chain.from_iterable(imap(f, iterator))
+ return chain.from_iterable(map(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def map(self, f, preservesPartitioning=False):
@@ -93,7 +97,7 @@ class DStream(object):
Return a new DStream by applying a function to each element of DStream.
"""
def func(iterator):
- return imap(f, iterator)
+ return map(f, iterator)
return self.mapPartitions(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
@@ -150,7 +154,7 @@ class DStream(object):
"""
Apply a function to each RDD in this DStream.
"""
- if func.func_code.co_argcount == 1:
+ if func.__code__.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
@@ -165,14 +169,14 @@ class DStream(object):
"""
def takeAndPrint(time, rdd):
taken = rdd.take(num + 1)
- print "-------------------------------------------"
- print "Time: %s" % time
- print "-------------------------------------------"
+ print("-------------------------------------------")
+ print("Time: %s" % time)
+ print("-------------------------------------------")
for record in taken[:num]:
- print record
+ print(record)
if len(taken) > num:
- print "..."
- print
+ print("...")
+ print()
self.foreachRDD(takeAndPrint)
@@ -181,7 +185,7 @@ class DStream(object):
Return a new DStream by applying a map function to the value of
each key-value pairs in this DStream without changing the key.
"""
- map_values_fn = lambda (k, v): (k, f(v))
+ map_values_fn = lambda kv: (kv[0], f(kv[1]))
return self.map(map_values_fn, preservesPartitioning=True)
def flatMapValues(self, f):
@@ -189,7 +193,7 @@ class DStream(object):
Return a new DStream by applying a flatmap function to the value
of each key-value pairs in this DStream without changing the key.
"""
- flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
+ flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
def glom(self):
@@ -286,10 +290,10 @@ class DStream(object):
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
"""
- if func.func_code.co_argcount == 1:
+ if func.__code__.co_argcount == 1:
oldfunc = func
func = lambda t, rdd: oldfunc(rdd)
- assert func.func_code.co_argcount == 2, "func should take one or two arguments"
+ assert func.__code__.co_argcount == 2, "func should take one or two arguments"
return TransformedDStream(self, func)
def transformWith(self, func, other, keepSerializer=False):
@@ -300,10 +304,10 @@ class DStream(object):
`func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
arguments of (`time`, `rdd_a`, `rdd_b`)
"""
- if func.func_code.co_argcount == 2:
+ if func.__code__.co_argcount == 2:
oldfunc = func
func = lambda t, a, b: oldfunc(a, b)
- assert func.func_code.co_argcount == 3, "func should take two or three arguments"
+ assert func.__code__.co_argcount == 3, "func should take two or three arguments"
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer)
dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
other._jdstream.dstream(), jfunc)
@@ -460,7 +464,7 @@ class DStream(object):
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
windowDuration, slideDuration, 1)
- return reduced.map(lambda (k, v): v)
+ return reduced.map(lambda kv: kv[1])
def countByWindow(self, windowDuration, slideDuration):
"""
@@ -489,7 +493,7 @@ class DStream(object):
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
- return counted.filter(lambda (k, v): v > 0).count()
+ return counted.filter(lambda kv: kv[1] > 0).count()
def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
@@ -548,7 +552,8 @@ class DStream(object):
def invReduceFunc(t, a, b):
b = b.reduceByKey(func, numPartitions)
joined = a.leftOuterJoin(b, numPartitions)
- return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)
+ return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
+ if kv[1] is not None else kv[0])
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
if invReduceFunc:
@@ -579,9 +584,9 @@ class DStream(object):
g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
else:
g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
- g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None))
- state = g.mapValues(lambda (vs, s): updateFunc(vs, s))
- return state.filter(lambda (k, v): v is not None)
+ g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
+ state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
+ return state.filter(lambda k_v: k_v[1] is not None)
jreduceFunc = TransformFunction(self._sc, reduceFunc,
self._sc.serializer, self._jrdd_deserializer)