aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2014-07-14 00:42:59 -0700
committerReynold Xin <rxin@apache.org>2014-07-14 00:42:59 -0700
commitaab5349660109481ee944721d611771da5a93109 (patch)
tree2be964ad38cfe0ef5df18be212f7fa812dec180f /python/pyspark/rdd.py
parent635888cbed0e3f4127252fb84db449f0cc9ed659 (diff)
downloadspark-aab5349660109481ee944721d611771da5a93109.tar.gz
spark-aab5349660109481ee944721d611771da5a93109.tar.bz2
spark-aab5349660109481ee944721d611771da5a93109.zip
Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.
Author: Prashant Sharma <prashant.s@imaginea.com> Closes #1354 from ScrapCodes/pep8-comp-1 and squashes the following commits: 9858ea8 [Prashant Sharma] Code Review d8851b7 [Prashant Sharma] Found # noqa works even inside comment blocks. Not sure if it works with all versions of python. 10c0cef [Prashant Sharma] Made rdd.py pep8 complaint by using Autopep8 and a little manual tweaking.
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py150
1 files changed, 92 insertions, 58 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f64f48e3a4..0c35c66680 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -69,16 +69,19 @@ def _extract_concise_traceback():
file, line, fun, what = tb[0]
return callsite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
- ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
+ ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
return callsite(function=sfun, file=ufile, linenum=uline)
_spark_stack_depth = 0
+
class _JavaStackTrace(object):
+
def __init__(self, sc):
tb = _extract_concise_traceback()
if tb is not None:
- self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
+ self._traceback = "%s at %s:%s" % (
+ tb.function, tb.file, tb.linenum)
else:
self._traceback = "Error! Could not extract traceback info"
self._context = sc
@@ -95,7 +98,9 @@ class _JavaStackTrace(object):
if _spark_stack_depth == 0:
self._context._jsc.setCallSite(None)
+
class MaxHeapQ(object):
+
"""
An implementation of MaxHeap.
>>> import pyspark.rdd
@@ -117,14 +122,14 @@ class MaxHeapQ(object):
"""
def __init__(self, maxsize):
- # we start from q[1], this makes calculating children as trivial as 2 * k
+ # We start from q[1], so its children are always 2 * k
self.q = [0]
self.maxsize = maxsize
def _swim(self, k):
- while (k > 1) and (self.q[k/2] < self.q[k]):
- self._swap(k, k/2)
- k = k/2
+ while (k > 1) and (self.q[k / 2] < self.q[k]):
+ self._swap(k, k / 2)
+ k = k / 2
def _swap(self, i, j):
t = self.q[i]
@@ -162,7 +167,9 @@ class MaxHeapQ(object):
self.q[1] = value
self._sink(1)
+
class RDD(object):
+
"""
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
Represents an immutable, partitioned collection of elements that can be
@@ -257,7 +264,8 @@ class RDD(object):
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
"""
- def func(split, iterator): return imap(f, iterator)
+ def func(split, iterator):
+ return imap(f, iterator)
return PipelinedRDD(self, func, preservesPartitioning)
def flatMap(self, f, preservesPartitioning=False):
@@ -271,7 +279,8 @@ class RDD(object):
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
- def func(s, iterator): return chain.from_iterable(imap(f, iterator))
+ def func(s, iterator):
+ return chain.from_iterable(imap(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
@@ -283,7 +292,8 @@ class RDD(object):
>>> rdd.mapPartitions(f).collect()
[3, 7]
"""
- def func(s, iterator): return f(iterator)
+ def func(s, iterator):
+ return f(iterator)
return self.mapPartitionsWithIndex(func)
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
@@ -311,17 +321,17 @@ class RDD(object):
6
"""
warnings.warn("mapPartitionsWithSplit is deprecated; "
- "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
+ "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
return self.mapPartitionsWithIndex(f, preservesPartitioning)
def getNumPartitions(self):
- """
- Returns the number of partitions in RDD
- >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
- >>> rdd.getNumPartitions()
- 2
- """
- return self._jrdd.partitions().size()
+ """
+ Returns the number of partitions in RDD
+ >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
+ >>> rdd.getNumPartitions()
+ 2
+ """
+ return self._jrdd.partitions().size()
def filter(self, f):
"""
@@ -331,7 +341,8 @@ class RDD(object):
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]
"""
- def func(iterator): return ifilter(f, iterator)
+ def func(iterator):
+ return ifilter(f, iterator)
return self.mapPartitions(func)
def distinct(self):
@@ -391,9 +402,11 @@ class RDD(object):
maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
if num > maxSampleSize:
- raise ValueError("Sample size cannot be greater than %d." % maxSampleSize)
+ raise ValueError(
+ "Sample size cannot be greater than %d." % maxSampleSize)
- fraction = RDD._computeFractionForSampleSize(num, initialCount, withReplacement)
+ fraction = RDD._computeFractionForSampleSize(
+ num, initialCount, withReplacement)
samples = self.sample(withReplacement, fraction, seed).collect()
# If the first sample didn't turn out large enough, keep trying to take samples;
@@ -499,17 +512,17 @@ class RDD(object):
raise TypeError
return self.union(other)
- def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
+ def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
"""
Sorts this RDD, which is assumed to consist of (key, value) pairs.
-
+ # noqa
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey(True, 2).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
>>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
- [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
+ [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
@@ -521,10 +534,12 @@ class RDD(object):
# number of (key, value) pairs falling into them
if numPartitions > 1:
rddSize = self.count()
- maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
+ # constant from Spark's RangePartitioner
+ maxSampleSize = numPartitions * 20.0
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
- samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+ samples = self.sample(False, fraction, 1).map(
+ lambda (k, v): k).collect()
samples = sorted(samples, reverse=(not ascending), key=keyfunc)
# we have numPartitions many parts but one of the them has
@@ -540,13 +555,13 @@ class RDD(object):
if ascending:
return p
else:
- return numPartitions-1-p
+ return numPartitions - 1 - p
def mapFunc(iterator):
yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
- .mapPartitions(mapFunc,preservesPartitioning=True)
+ .mapPartitions(mapFunc, preservesPartitioning=True)
.flatMap(lambda x: x, preservesPartitioning=True))
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
@@ -570,7 +585,8 @@ class RDD(object):
>>> sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
"""
- def func(iterator): yield list(iterator)
+ def func(iterator):
+ yield list(iterator)
return self.mapPartitions(func)
def cartesian(self, other):
@@ -607,7 +623,9 @@ class RDD(object):
['1', '2', '', '3']
"""
def func(iterator):
- pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
+ pipe = Popen(
+ shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
+
def pipe_objs(out):
for obj in iterator:
out.write(str(obj).rstrip('\n') + '\n')
@@ -646,7 +664,7 @@ class RDD(object):
Return a list that contains all of the elements in this RDD.
"""
with _JavaStackTrace(self.context) as st:
- bytesInJava = self._jrdd.collect().iterator()
+ bytesInJava = self._jrdd.collect().iterator()
return list(self._collect_iterator_through_file(bytesInJava))
def _collect_iterator_through_file(self, iterator):
@@ -736,7 +754,6 @@ class RDD(object):
return self.mapPartitions(func).fold(zeroValue, combOp)
-
def max(self):
"""
Find the maximum item in this RDD.
@@ -844,6 +861,7 @@ class RDD(object):
for obj in iterator:
counts[obj] += 1
yield counts
+
def mergeMaps(m1, m2):
for (k, v) in m2.iteritems():
m1[k] += v
@@ -888,22 +906,22 @@ class RDD(object):
def topNKeyedElems(iterator, key_=None):
q = MaxHeapQ(num)
for k in iterator:
- if key_ != None:
+ if key_ is not None:
k = (key_(k), k)
q.insert(k)
yield q.getElements()
def unKey(x, key_=None):
- if key_ != None:
+ if key_ is not None:
x = [i[1] for i in x]
return x
def merge(a, b):
return next(topNKeyedElems(a + b))
- result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
+ result = self.mapPartitions(
+ lambda i: topNKeyedElems(i, key)).reduce(merge)
return sorted(unKey(result, key), key=key)
-
def take(self, num):
"""
Take the first num elements of the RDD.
@@ -947,7 +965,8 @@ class RDD(object):
yield next(iterator)
taken += 1
- p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
+ p = range(
+ partsScanned, min(partsScanned + numPartsToTry, totalParts))
res = self.context.runJob(self, takeUpToNumLeft, p, True)
items += res
@@ -977,7 +996,7 @@ class RDD(object):
[1, 2, 'rdd', 'spark']
"""
self._reserialize(BatchedSerializer(PickleSerializer(),
- batchSize))._jrdd.saveAsObjectFile(path)
+ batchSize))._jrdd.saveAsObjectFile(path)
def saveAsTextFile(self, path):
"""
@@ -1075,6 +1094,7 @@ class RDD(object):
for (k, v) in iterator:
m[k] = v if k not in m else func(m[k], v)
yield m
+
def mergeMaps(m1, m2):
for (k, v) in m2.iteritems():
m1[k] = v if k not in m1 else func(m1[k], v)
@@ -1162,6 +1182,7 @@ class RDD(object):
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
outputSerializer = self.ctx._unbatched_serializer
+
def add_shuffle_key(split, iterator):
buckets = defaultdict(list)
@@ -1174,7 +1195,8 @@ class RDD(object):
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.context) as st:
- pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
+ pairRDD = self.ctx._jvm.PairwiseRDD(
+ keyed._jrdd.rdd()).asJavaPairRDD()
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
@@ -1213,6 +1235,7 @@ class RDD(object):
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
+
def combineLocally(iterator):
combiners = {}
for x in iterator:
@@ -1224,10 +1247,11 @@ class RDD(object):
return combiners.iteritems()
locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)
+
def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
- if not k in combiners:
+ if k not in combiners:
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
@@ -1236,17 +1260,19 @@ class RDD(object):
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
"""
- Aggregate the values of each key, using given combine functions and a neutral "zero value".
- This function can return a different result type, U, than the type of the values in this RDD,
- V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
- The former operation is used for merging values within a partition, and the latter is used
- for merging values between partitions. To avoid memory allocation, both of these functions are
+ Aggregate the values of each key, using given combine functions and a neutral
+ "zero value". This function can return a different result type, U, than the type
+ of the values in this RDD, V. Thus, we need one operation for merging a V into
+ a U and one operation for merging two U's, The former operation is used for merging
+ values within a partition, and the latter is used for merging values between
+ partitions. To avoid memory allocation, both of these functions are
allowed to modify and return their first argument instead of creating a new U.
"""
def createZero():
- return copy.deepcopy(zeroValue)
+ return copy.deepcopy(zeroValue)
- return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
+ return self.combineByKey(
+ lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
def foldByKey(self, zeroValue, func, numPartitions=None):
"""
@@ -1261,11 +1287,10 @@ class RDD(object):
[('a', 2), ('b', 1)]
"""
def createZero():
- return copy.deepcopy(zeroValue)
+ return copy.deepcopy(zeroValue)
return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
-
# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
@@ -1292,7 +1317,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numPartitions).mapValues(lambda x: ResultIterable(x))
+ numPartitions).mapValues(lambda x: ResultIterable(x))
# TODO: add tests
def flatMapValues(self, f):
@@ -1362,7 +1387,8 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
- filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
+ def filter_func((key, vals)):
+ return len(vals[0]) > 0 and len(vals[1]) == 0
map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
@@ -1375,8 +1401,9 @@ class RDD(object):
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
"""
- rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder
- return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
+ # note: here 'True' is just a placeholder
+ rdd = other.map(lambda x: (x, True))
+ return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
def keyBy(self, f):
"""
@@ -1434,7 +1461,7 @@ class RDD(object):
"""
pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
- other._jrdd_deserializer)
+ other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)
def name(self):
@@ -1503,7 +1530,9 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+
class PipelinedRDD(RDD):
+
"""
Pipelined maps:
>>> rdd = sc.parallelize([1, 2, 3, 4])
@@ -1519,6 +1548,7 @@ class PipelinedRDD(RDD):
>>> rdd.flatMap(lambda x: [x, x]).reduce(add)
20
"""
+
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
# This transformation is the first in its stage:
@@ -1528,6 +1558,7 @@ class PipelinedRDD(RDD):
self._prev_jrdd_deserializer = prev._jrdd_deserializer
else:
prev_func = prev.func
+
def pipeline_func(split, iterator):
return func(split, prev_func(split, iterator))
self.func = pipeline_func
@@ -1560,11 +1591,13 @@ class PipelinedRDD(RDD):
env = MapConverter().convert(self.ctx.environment,
self.ctx._gateway._gateway_client)
includes = ListConverter().convert(self.ctx._python_includes,
- self.ctx._gateway._gateway_client)
+ self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
- bytearray(pickled_command), env, includes, self.preservesPartitioning,
- self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
- class_tag)
+ bytearray(pickled_command),
+ env, includes, self.preservesPartitioning,
+ self.ctx.pythonExec,
+ broadcast_vars, self.ctx._javaAccumulator,
+ class_tag)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
@@ -1579,7 +1612,8 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
+ (failure_count, test_count) = doctest.testmod(
+ globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)