aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/dstream.py
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-04-16 16:20:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-04-16 16:20:57 -0700
commit04e44b37cc04f62fbf9e08c7076349e0a4d12ea8 (patch)
treeb6429253955210445ddc37faa4d5166ea25a91e2 /python/pyspark/streaming/dstream.py
parent55f553a979db925aa0c3559f7e80b99d2bf3feb4 (diff)
downloadspark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.gz
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.tar.bz2
spark-04e44b37cc04f62fbf9e08c7076349e0a4d12ea8.zip
[SPARK-4897] [PySpark] Python 3 support
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
Diffstat (limited to 'python/pyspark/streaming/dstream.py')
-rw-r--r--python/pyspark/streaming/dstream.py51
1 files changed, 28 insertions, 23 deletions
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index 3fa4244423..ff097985fa 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -15,11 +15,15 @@
# limitations under the License.
#
-from itertools import chain, ifilter, imap
+import sys
import operator
import time
+from itertools import chain
from datetime import datetime
+if sys.version < "3":
+ from itertools import imap as map, ifilter as filter
+
from py4j.protocol import Py4JJavaError
from pyspark import RDD
@@ -76,7 +80,7 @@ class DStream(object):
Return a new DStream containing only the elements that satisfy predicate.
"""
def func(iterator):
- return ifilter(f, iterator)
+ return filter(f, iterator)
return self.mapPartitions(func, True)
def flatMap(self, f, preservesPartitioning=False):
@@ -85,7 +89,7 @@ class DStream(object):
this DStream, and then flattening the results
"""
def func(s, iterator):
- return chain.from_iterable(imap(f, iterator))
+ return chain.from_iterable(map(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def map(self, f, preservesPartitioning=False):
@@ -93,7 +97,7 @@ class DStream(object):
Return a new DStream by applying a function to each element of DStream.
"""
def func(iterator):
- return imap(f, iterator)
+ return map(f, iterator)
return self.mapPartitions(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
@@ -150,7 +154,7 @@ class DStream(object):
"""
Apply a function to each RDD in this DStream.
"""
- if func.func_code.co_argcount == 1:
+ if func.__code__.co_argcount == 1:
old_func = func
func = lambda t, rdd: old_func(rdd)
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer)
@@ -165,14 +169,14 @@ class DStream(object):
"""
def takeAndPrint(time, rdd):
taken = rdd.take(num + 1)
- print "-------------------------------------------"
- print "Time: %s" % time
- print "-------------------------------------------"
+ print("-------------------------------------------")
+ print("Time: %s" % time)
+ print("-------------------------------------------")
for record in taken[:num]:
- print record
+ print(record)
if len(taken) > num:
- print "..."
- print
+ print("...")
+ print()
self.foreachRDD(takeAndPrint)
@@ -181,7 +185,7 @@ class DStream(object):
Return a new DStream by applying a map function to the value of
each key-value pairs in this DStream without changing the key.
"""
- map_values_fn = lambda (k, v): (k, f(v))
+ map_values_fn = lambda kv: (kv[0], f(kv[1]))
return self.map(map_values_fn, preservesPartitioning=True)
def flatMapValues(self, f):
@@ -189,7 +193,7 @@ class DStream(object):
Return a new DStream by applying a flatmap function to the value
of each key-value pairs in this DStream without changing the key.
"""
- flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
+ flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
def glom(self):
@@ -286,10 +290,10 @@ class DStream(object):
`func` can have one argument of `rdd`, or have two arguments of
(`time`, `rdd`)
"""
- if func.func_code.co_argcount == 1:
+ if func.__code__.co_argcount == 1:
oldfunc = func
func = lambda t, rdd: oldfunc(rdd)
- assert func.func_code.co_argcount == 2, "func should take one or two arguments"
+ assert func.__code__.co_argcount == 2, "func should take one or two arguments"
return TransformedDStream(self, func)
def transformWith(self, func, other, keepSerializer=False):
@@ -300,10 +304,10 @@ class DStream(object):
`func` can have two arguments of (`rdd_a`, `rdd_b`) or have three
arguments of (`time`, `rdd_a`, `rdd_b`)
"""
- if func.func_code.co_argcount == 2:
+ if func.__code__.co_argcount == 2:
oldfunc = func
func = lambda t, a, b: oldfunc(a, b)
- assert func.func_code.co_argcount == 3, "func should take two or three arguments"
+ assert func.__code__.co_argcount == 3, "func should take two or three arguments"
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer, other._jrdd_deserializer)
dstream = self._sc._jvm.PythonTransformed2DStream(self._jdstream.dstream(),
other._jdstream.dstream(), jfunc)
@@ -460,7 +464,7 @@ class DStream(object):
keyed = self.map(lambda x: (1, x))
reduced = keyed.reduceByKeyAndWindow(reduceFunc, invReduceFunc,
windowDuration, slideDuration, 1)
- return reduced.map(lambda (k, v): v)
+ return reduced.map(lambda kv: kv[1])
def countByWindow(self, windowDuration, slideDuration):
"""
@@ -489,7 +493,7 @@ class DStream(object):
keyed = self.map(lambda x: (x, 1))
counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
windowDuration, slideDuration, numPartitions)
- return counted.filter(lambda (k, v): v > 0).count()
+ return counted.filter(lambda kv: kv[1] > 0).count()
def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
"""
@@ -548,7 +552,8 @@ class DStream(object):
def invReduceFunc(t, a, b):
b = b.reduceByKey(func, numPartitions)
joined = a.leftOuterJoin(b, numPartitions)
- return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1)
+ return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
+ if kv[1] is not None else kv[0])
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
if invReduceFunc:
@@ -579,9 +584,9 @@ class DStream(object):
g = b.groupByKey(numPartitions).mapValues(lambda vs: (list(vs), None))
else:
g = a.cogroup(b.partitionBy(numPartitions), numPartitions)
- g = g.mapValues(lambda (va, vb): (list(vb), list(va)[0] if len(va) else None))
- state = g.mapValues(lambda (vs, s): updateFunc(vs, s))
- return state.filter(lambda (k, v): v is not None)
+ g = g.mapValues(lambda ab: (list(ab[1]), list(ab[0])[0] if len(ab[0]) else None))
+ state = g.mapValues(lambda vs_s: updateFunc(vs_s[0], vs_s[1]))
+ return state.filter(lambda k_v: k_v[1] is not None)
jreduceFunc = TransformFunction(self._sc, reduceFunc,
self._sc.serializer, self._jrdd_deserializer)