aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py189
1 files changed, 109 insertions, 80 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 93e658eded..d9cdbb666f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -16,21 +16,29 @@
#
import copy
-from collections import defaultdict
-from itertools import chain, ifilter, imap
-import operator
import sys
+import os
+import re
+import operator
import shlex
-from subprocess import Popen, PIPE
-from tempfile import NamedTemporaryFile
-from threading import Thread
import warnings
import heapq
import bisect
import random
import socket
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
+from threading import Thread
+from collections import defaultdict
+from itertools import chain
+from functools import reduce
from math import sqrt, log, isinf, isnan, pow, ceil
+if sys.version > '3':
+ basestring = unicode = str
+else:
+ from itertools import imap as map, ifilter as filter
+
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
@@ -50,20 +58,21 @@ from py4j.java_collections import ListConverter, MapConverter
__all__ = ["RDD"]
-# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
-# hash for string
def portable_hash(x):
"""
- This function returns consistant hash code for builtin types, especially
+ This function returns consistent hash code for builtin types, especially
for None and tuple with None.
- The algrithm is similar to that one used by CPython 2.7
+ The algorithm is similar to that one used by CPython 2.7
>>> portable_hash(None)
0
>>> portable_hash((None, 1)) & 0xffffffff
219750521
"""
+ if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+ raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
+
if x is None:
return 0
if isinstance(x, tuple):
@@ -71,7 +80,7 @@ def portable_hash(x):
for i in x:
h ^= portable_hash(i)
h *= 1000003
- h &= sys.maxint
+ h &= sys.maxsize
h ^= len(x)
if h == -1:
h = -2
@@ -123,6 +132,19 @@ def _load_from_socket(port, serializer):
sock.close()
+def ignore_unicode_prefix(f):
+ """
+ Ignore the 'u' prefix of string in doc tests, to make it works
+ in both python 2 and 3
+ """
+ if sys.version >= '3':
+ # the representation of unicode string in Python 3 does not have prefix 'u',
+ # so remove the prefix 'u' for doc tests
+ literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE)
+ f.__doc__ = literal_re.sub(r'\1\2', f.__doc__)
+ return f
+
+
class Partitioner(object):
def __init__(self, numPartitions, partitionFunc):
self.numPartitions = numPartitions
@@ -251,7 +273,7 @@ class RDD(object):
[('a', 1), ('b', 1), ('c', 1)]
"""
def func(_, iterator):
- return imap(f, iterator)
+ return map(f, iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def flatMap(self, f, preservesPartitioning=False):
@@ -266,7 +288,7 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator):
- return chain.from_iterable(imap(f, iterator))
+ return chain.from_iterable(map(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
@@ -329,7 +351,7 @@ class RDD(object):
[2, 4]
"""
def func(iterator):
- return ifilter(f, iterator)
+ return filter(f, iterator)
return self.mapPartitions(func, True)
def distinct(self, numPartitions=None):
@@ -341,7 +363,7 @@ class RDD(object):
"""
return self.map(lambda x: (x, None)) \
.reduceByKey(lambda x, _: x, numPartitions) \
- .map(lambda (x, _): x)
+ .map(lambda x: x[0])
def sample(self, withReplacement, fraction, seed=None):
"""
@@ -354,8 +376,8 @@ class RDD(object):
:param seed: seed for the random number generator
>>> rdd = sc.parallelize(range(100), 4)
- >>> rdd.sample(False, 0.1, 81).count()
- 10
+ >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
+ True
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -368,12 +390,14 @@ class RDD(object):
:param seed: random seed
:return: split RDDs in a list
- >>> rdd = sc.parallelize(range(5), 1)
+ >>> rdd = sc.parallelize(range(500), 1)
>>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
- >>> rdd1.collect()
- [1, 3]
- >>> rdd2.collect()
- [0, 2, 4]
+ >>> len(rdd1.collect() + rdd2.collect())
+ 500
+ >>> 150 < rdd1.count() < 250
+ True
+ >>> 250 < rdd2.count() < 350
+ True
"""
s = float(sum(weights))
cweights = [0.0]
@@ -416,7 +440,7 @@ class RDD(object):
rand.shuffle(samples)
return samples
- maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
+ maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize))
if num > maxSampleSize:
raise ValueError(
"Sample size cannot be greater than %d." % maxSampleSize)
@@ -430,7 +454,7 @@ class RDD(object):
# See: scala/spark/RDD.scala
while len(samples) < num:
# TODO: add log warning for when more than one iteration was run
- seed = rand.randint(0, sys.maxint)
+ seed = rand.randint(0, sys.maxsize)
samples = self.sample(withReplacement, fraction, seed).collect()
rand.shuffle(samples)
@@ -507,7 +531,7 @@ class RDD(object):
"""
return self.map(lambda v: (v, None)) \
.cogroup(other.map(lambda v: (v, None))) \
- .filter(lambda (k, vs): all(vs)) \
+ .filter(lambda k_vs: all(k_vs[1])) \
.keys()
def _reserialize(self, serializer=None):
@@ -549,7 +573,7 @@ class RDD(object):
def sortPartition(iterator):
sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
- return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+ return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
@@ -579,7 +603,7 @@ class RDD(object):
def sortPartition(iterator):
sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
- return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+ return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
if numPartitions == 1:
if self.getNumPartitions() > 1:
@@ -594,12 +618,12 @@ class RDD(object):
return self # empty RDD
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
- samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+ samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
samples = sorted(samples, key=keyfunc)
# we have numPartitions many parts but one of the them has
# an implicit boundary
- bounds = [samples[len(samples) * (i + 1) / numPartitions]
+ bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
for i in range(0, numPartitions - 1)]
def rangePartitioner(k):
@@ -662,12 +686,13 @@ class RDD(object):
"""
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
+ @ignore_unicode_prefix
def pipe(self, command, env={}):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
- ['1', '2', '', '3']
+ [u'1', u'2', u'', u'3']
"""
def func(iterator):
pipe = Popen(
@@ -675,17 +700,18 @@ class RDD(object):
def pipe_objs(out):
for obj in iterator:
- out.write(str(obj).rstrip('\n') + '\n')
+ s = str(obj).rstrip('\n') + '\n'
+ out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
- return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
+ return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
return self.mapPartitions(func)
def foreach(self, f):
"""
Applies a function to all elements of this RDD.
- >>> def f(x): print x
+ >>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
def processPartition(iterator):
@@ -700,7 +726,7 @@ class RDD(object):
>>> def f(iterator):
... for x in iterator:
- ... print x
+ ... print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
def func(it):
@@ -874,7 +900,7 @@ class RDD(object):
# aggregation.
while numPartitions > scale + numPartitions / scale:
numPartitions /= scale
- curNumPartitions = numPartitions
+ curNumPartitions = int(numPartitions)
def mapPartition(i, iterator):
for obj in iterator:
@@ -984,7 +1010,7 @@ class RDD(object):
(('a', 'b', 'c'), [2, 2])
"""
- if isinstance(buckets, (int, long)):
+ if isinstance(buckets, int):
if buckets < 1:
raise ValueError("number of buckets must be >= 1")
@@ -1020,6 +1046,7 @@ class RDD(object):
raise ValueError("Can not generate buckets with infinite value")
# keep them as integer if possible
+ inc = int(inc)
if inc * buckets != maxv - minv:
inc = (maxv - minv) * 1.0 / buckets
@@ -1137,7 +1164,7 @@ class RDD(object):
yield counts
def mergeMaps(m1, m2):
- for k, v in m2.iteritems():
+ for k, v in m2.items():
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
@@ -1378,8 +1405,8 @@ class RDD(object):
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
- >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
- [1, 2, 'rdd', 'spark']
+ >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
+ ['1', '2', 'rdd', 'spark']
"""
if batchSize == 0:
ser = AutoBatchedSerializer(PickleSerializer())
@@ -1387,6 +1414,7 @@ class RDD(object):
ser = BatchedSerializer(PickleSerializer(), batchSize)
self._reserialize(ser)._jrdd.saveAsObjectFile(path)
+ @ignore_unicode_prefix
def saveAsTextFile(self, path, compressionCodecClass=None):
"""
Save this RDD as a text file, using string representations of elements.
@@ -1418,12 +1446,13 @@ class RDD(object):
>>> codec = "org.apache.hadoop.io.compress.GzipCodec"
>>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
>>> from fileinput import input, hook_compressed
- >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)))
- 'bar\\nfoo\\n'
+ >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
+ >>> b''.join(result).decode('utf-8')
+ u'bar\\nfoo\\n'
"""
def func(split, iterator):
for x in iterator:
- if not isinstance(x, basestring):
+ if not isinstance(x, (unicode, bytes)):
x = unicode(x)
if isinstance(x, unicode):
x = x.encode("utf-8")
@@ -1458,7 +1487,7 @@ class RDD(object):
>>> m.collect()
[1, 3]
"""
- return self.map(lambda (k, v): k)
+ return self.map(lambda x: x[0])
def values(self):
"""
@@ -1468,7 +1497,7 @@ class RDD(object):
>>> m.collect()
[2, 4]
"""
- return self.map(lambda (k, v): v)
+ return self.map(lambda x: x[1])
def reduceByKey(self, func, numPartitions=None):
"""
@@ -1507,7 +1536,7 @@ class RDD(object):
yield m
def mergeMaps(m1, m2):
- for k, v in m2.iteritems():
+ for k, v in m2.items():
m1[k] = func(m1[k], v) if k in m1 else v
return m1
return self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -1604,8 +1633,8 @@ class RDD(object):
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
- >>> set(sets[0]).intersection(set(sets[1]))
- set([])
+ >>> len(set(sets[0]).intersection(set(sets[1])))
+ 0
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
@@ -1637,22 +1666,22 @@ class RDD(object):
if (c % 1000 == 0 and get_used_memory() > limit
or c > batch):
n, size = len(buckets), 0
- for split in buckets.keys():
+ for split in list(buckets.keys()):
yield pack_long(split)
d = outputSerializer.dumps(buckets[split])
del buckets[split]
yield d
size += len(d)
- avg = (size / n) >> 20
+ avg = int(size / n) >> 20
# let 1M < avg < 10M
if avg < 1:
batch *= 1.5
elif avg > 10:
- batch = max(batch / 1.5, 1)
+ batch = max(int(batch / 1.5), 1)
c = 0
- for split, items in buckets.iteritems():
+ for split, items in buckets.items():
yield pack_long(split)
yield outputSerializer.dumps(items)
@@ -1707,7 +1736,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
- return merger.iteritems()
+ return merger.items()
locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions)
@@ -1716,7 +1745,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
- return merger.iteritems()
+ return merger.items()
return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
@@ -1745,7 +1774,7 @@ class RDD(object):
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
- >>> rdd.foldByKey(0, add).collect()
+ >>> sorted(rdd.foldByKey(0, add).collect())
[('a', 2), ('b', 1)]
"""
def createZero():
@@ -1769,10 +1798,10 @@ class RDD(object):
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.
- >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(x.groupByKey().mapValues(len).collect())
+ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+ >>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
- >>> sorted(x.groupByKey().mapValues(list).collect())
+ >>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""
def createCombiner(x):
@@ -1795,7 +1824,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
- return merger.iteritems()
+ return merger.items()
locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions)
@@ -1804,7 +1833,7 @@ class RDD(object):
merger = ExternalGroupBy(agg, memory, serializer)\
if spill else InMemoryMerger(agg)
merger.mergeCombiners(it)
- return merger.iteritems()
+ return merger.items()
return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
@@ -1819,7 +1848,7 @@ class RDD(object):
>>> x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
"""
- flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
+ flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
def mapValues(self, f):
@@ -1833,7 +1862,7 @@ class RDD(object):
>>> x.mapValues(f).collect()
[('a', 3), ('b', 1)]
"""
- map_values_fn = lambda (k, v): (k, f(v))
+ map_values_fn = lambda kv: (kv[0], f(kv[1]))
return self.map(map_values_fn, preservesPartitioning=True)
def groupWith(self, other, *others):
@@ -1844,8 +1873,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> z = sc.parallelize([("b", 42)])
- >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
- sorted(list(w.groupWith(x, y, z).collect())))
+ >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
"""
@@ -1860,7 +1888,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
- >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
+ >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup((self, other), numPartitions)
@@ -1896,8 +1924,9 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
- def filter_func((key, vals)):
- return vals[0] and not vals[1]
+ def filter_func(pair):
+ key, (val1, val2) = pair
+ return val1 and not val2
return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])
def subtract(self, other, numPartitions=None):
@@ -1919,8 +1948,8 @@ class RDD(object):
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
- >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
- [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+ >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
+ [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
"""
return self.map(lambda x: (f(x), x))
@@ -2049,17 +2078,18 @@ class RDD(object):
"""
Return the name of this RDD.
"""
- name_ = self._jrdd.name()
- if name_:
- return name_.encode('utf-8')
+ n = self._jrdd.name()
+ if n:
+ return n
+ @ignore_unicode_prefix
def setName(self, name):
"""
Assign a name to this RDD.
- >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1 = sc.parallelize([1, 2])
>>> rdd1.setName('RDD1').name()
- 'RDD1'
+ u'RDD1'
"""
self._jrdd.setName(name)
return self
@@ -2121,7 +2151,7 @@ class RDD(object):
>>> sorted.lookup(1024)
[]
"""
- values = self.filter(lambda (k, v): k == key).values()
+ values = self.filter(lambda kv: kv[0] == key).values()
if self.partitioner is not None:
return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)], False)
@@ -2159,7 +2189,7 @@ class RDD(object):
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
- >>> r = sum(xrange(1000))
+ >>> r = sum(range(1000))
>>> (rdd.sumApprox(1000) - r) / r < 0.05
True
"""
@@ -2176,7 +2206,7 @@ class RDD(object):
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
- >>> r = sum(xrange(1000)) / 1000.0
+ >>> r = sum(range(1000)) / 1000.0
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
"""
@@ -2201,10 +2231,10 @@ class RDD(object):
It must be greater than 0.000017.
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
- >>> 950 < n < 1050
+ >>> 900 < n < 1100
True
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
- >>> 18 < n < 22
+ >>> 16 < n < 24
True
"""
if relativeSD < 0.000017:
@@ -2223,8 +2253,7 @@ class RDD(object):
>>> [x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
- partitions = xrange(self.getNumPartitions())
- for partition in partitions:
+ for partition in range(self.getNumPartitions()):
rows = self.context.runJob(self, lambda x: x, [partition])
for row in rows:
yield row