aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-09 12:08:04 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-09 12:08:04 -0700
commit478b2b7edcf42fa3e16f625d4b8676f2bb31f8dc (patch)
tree4e98e75485e8450fb4dd7f9cd7917e52b9910297 /python/pyspark/rdd.py
parentb4fa11f6c96ee37ecd30231c1e22630055f52115 (diff)
downloadspark-478b2b7edcf42fa3e16f625d4b8676f2bb31f8dc.tar.gz
spark-478b2b7edcf42fa3e16f625d4b8676f2bb31f8dc.tar.bz2
spark-478b2b7edcf42fa3e16f625d4b8676f2bb31f8dc.zip
Fix PySpark docs and an overly long line of code after fdbae41e
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7dfabb0b7d..7019fb8bee 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -117,8 +117,6 @@ class RDD(object):
else:
return None
- # TODO persist(self, storageLevel)
-
def map(self, f, preservesPartitioning=False):
"""
Return a new RDD containing the distinct elements in this RDD.
@@ -227,7 +225,7 @@ class RDD(object):
total = num
samples = self.sample(withReplacement, fraction, seed).collect()
-
+
# If the first sample didn't turn out large enough, keep trying to take samples;
# this shouldn't happen often because we use a big multiplier for their initial size.
# See: scala/spark/RDD.scala
@@ -288,7 +286,7 @@ class RDD(object):
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
- samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+ samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
samples = sorted(samples, reverse=(not ascending), key=keyfunc)
# we have numPartitions many parts but one of the them has
@@ -309,7 +307,9 @@ class RDD(object):
def mapFunc(iterator):
yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
- return self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc).mapPartitions(mapFunc,preservesPartitioning=True).flatMap(lambda x: x, preservesPartitioning=True)
+ return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
+ .mapPartitions(mapFunc,preservesPartitioning=True)
+ .flatMap(lambda x: x, preservesPartitioning=True))
def glom(self):
"""
@@ -471,7 +471,7 @@ class RDD(object):
3
"""
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
-
+
def stats(self):
"""
Return a L{StatCounter} object that captures the mean, variance
@@ -508,7 +508,7 @@ class RDD(object):
0.816...
"""
return self.stats().stdev()
-
+
def sampleStdev(self):
"""
Compute the sample standard deviation of this RDD's elements (which corrects for bias in
@@ -878,7 +878,7 @@ class RDD(object):
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
- """
+ """
filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)