diff options
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r-- | python/pyspark/streaming/dstream.py | 51 |
1 files changed, 28 insertions, 23 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 3fa4244423..ff097985fa 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -15,11 +15,15 @@ # limitations under the License. # -from itertools import chain, ifilter, imap +import sys import operator import time +from itertools import chain from datetime import datetime +if sys.version < "3": + from itertools import imap as map, ifilter as filter + from py4j.protocol import Py4JJavaError from pyspark import RDD @@ -76,7 +80,7 @@ class DStream(object): Return a new DStream containing only the elements that satisfy predicate. """ def func(iterator): - return ifilter(f, iterator) + return filter(f, iterator) return self.mapPartitions(func, True) def flatMap(self, f, preservesPartitioning=False): @@ -85,7 +89,7 @@ class DStream(object): this DStream, and then flattening the results """ def func(s, iterator): - return chain.from_iterable(imap(f, iterator)) + return chain.from_iterable(map(f, iterator)) return self.mapPartitionsWithIndex(func, preservesPartitioning) def map(self, f, preservesPartitioning=False): @@ -93,7 +97,7 @@ class DStream(object): Return a new DStream by applying a function to each element of DStream. """ def func(iterator): - return imap(f, iterator) + return map(f, iterator) return self.mapPartitions(func, preservesPartitioning) def mapPartitions(self, f, preservesPartitioning=False): @@ -150,7 +154,7 @@ class DStream(object): """ Apply a function to each RDD in this DStream. """ - if func.func_code.co_argcount == 1: + if func.__code__.co_argcount == 1: old_func = func func = lambda t, rdd: old_func(rdd) jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) @@ -165,14 +169,14 @@ class DStream(object): """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) - print "-------------------------------------------" - print "Time: %s" % time - print "-------------------------------------------" + print("-------------------------------------------") + print("Time: %s" % time) + print("-------------------------------------------") for record in taken[:num]: - print record + print(record) if len(taken) > num: - print "..." - print + print("...") + print() self.foreachRDD(takeAndPrint) @@ -181,7 +185,7 @@ class DStream(object): Return a new DStream by applying a map function to the value of each key-value pairs in this DStream without changing the key. """ - map_values_fn = lambda (k, v): (k, f(v)) + map_values_fn = lambda kv: (kv[0], f(kv[1])) return self.map(map_values_fn, preservesPartitioning=True) def flatMapValues(self, f): @@ -189,7 +193,7 @@ class DStream(object): Return a new DStream by applying a flatmap function to the value of each key-value pairs in this DStream without changing the key. """ - flat_map_fn = lambda (k, v): ((k, x) for x in f(v)) + flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1])) return self.flatMap(flat_map_fn, preservesPartitioning=True) def glom(self): @@ -286,10 +290,10 @@ class DStream(object): `func` can have one argument of `rdd`, or have two arguments of (`time`, `rdd`) """ - if func.func_code.co_argcount == 1: + if func.__code__.co_argcount == 1: oldfunc = func func = lambda t, rdd: oldfunc(rdd) - assert func.func_code.co_argcount == 2, "func should take one or two arguments" + assert func.__code__.co_argcount == 2, "func should take one or two arguments" return TransformedDStream(self, func) def transformWith(self, func, other, keepSerializer=False): @@ -300,10 +304,10 @@ class DStream(object): `func` can have two arguments of (`rdd_a`, `rdd_b`) or have three arguments of (`time`, `rdd_a`, `rdd_b`) """ - if func.func_code.co_argcount == 2: + if func.__code__.co_argcount == 2: oldfunc = func func = lambda t, a, b: oldfunc(a, b) - assert func.func_code.co_argcount == 3, "func should take two or three arguments" + assert func.__code__.co_argcount == 3, "func should take two or three arguments" jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer) dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(), other._jdstream.dstream(), jfunc) @@ -460,7 +464,7 @@ class DStream(object): keyed = self.map(lambda x: (1, x)) reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1) - return reduced.map(lambda (k, v): v) + return reduced.map(lambda kv: kv[1]) def countByWindow(self, windowDuration, slideDuration): """ @@ -489,7 +493,7 @@ class DStream(object): keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, windowDuration, slideDuration, numPartitions) - return counted.filter(lambda (k, v): v > 0).count() + return counted.filter(lambda kv: kv[1] > 0).count() def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None): """ @@ -548,7 +552,8 @@ class DStream(object): def invReduceFunc(t, a, b): b = b.reduceByKey(func, numPartitions) joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invReduceFunc: @@ -579,9 +584,9 @@ class DStream(object): g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None)) else: g = a.cogroup(b.partitionBy(numPartitions), numPartitions) - g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None)) - state = g.mapValues(lambda (vs, s): updateFunc(vs, s)) - return state.filter(lambda (k, v): v is not None) + g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None)) + state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1])) + return state.filter(lambda k_v: k_v[1] is not None) jreduceFunc = TransformFunction(self._sc, reduceFunc, self._sc.serializer, self._jrdd_deserializer) |