aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
authorNicholas Chammas <nicholas.chammas@gmail.com>2014-08-06 12:58:24 -0700
committerReynold Xin <rxin@apache.org>2014-08-06 12:58:24 -0700
commitd614967b0bad1e6c5277d612602ec0a653a00258 (patch)
tree8df1a52cbe074af4f928c0ac8f08a63075882d0b /python/pyspark/shuffle.py
parenta6cd31108f0d73ce6823daafe8447677e03cfd13 (diff)
downloadspark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.gz
spark-d614967b0bad1e6c5277d612602ec0a653a00258.tar.bz2
spark-d614967b0bad1e6c5277d612602ec0a653a00258.zip
[SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that. Notes: * We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server. * I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request. * I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete. * Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo. Author: Nicholas Chammas <nicholas.chammas@gmail.com> Author: nchammas <nicholas.chammas@gmail.com> Closes #1744 from nchammas/master and squashes the following commits: 274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes 983d963 [nchammas] Merge pull request #5 from apache/master 1db5314 [nchammas] Merge pull request #4 from apache/master 0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing 6db9a44 [nchammas] Merge pull request #3 from apache/master 7b4750e [Nicholas Chammas] merge upstream changes 91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks 44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes 9da347f [nchammas] Merge pull request #2 from apache/master aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections 21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8 6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes fe57ed0 [Nicholas Chammas] removing merge conflict backups 9c01d4c [nchammas] Merge pull request #1 from apache/master 9a66cb0 [Nicholas Chammas] resolving merge conflicts a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status 723ed39 [Nicholas Chammas] always delete the report file 0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests 12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter 61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter 75ad552 [Nicholas Chammas] make check output style consistent
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index e3923d1c36..2c68cd4921 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -45,7 +45,7 @@ except ImportError:
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
- "support with spilling")
+ "support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
@@ -141,7 +141,7 @@ class ExternalMerger(Merger):
This class works as follows:
- - It repeatedly combine the items and save them in one dict in
+ - It repeatedly combine the items and save them in one dict in
memory.
- When the used memory goes above memory limit, it will split
@@ -190,12 +190,12 @@ class ExternalMerger(Merger):
MAX_TOTAL_PARTITIONS = 4096
def __init__(self, aggregator, memory_limit=512, serializer=None,
- localdirs=None, scale=1, partitions=59, batch=1000):
+ localdirs=None, scale=1, partitions=59, batch=1000):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ BatchedSerializer(PickleSerializer(), 1024)
self.localdirs = localdirs or self._get_dirs()
# number of partitions when spill data into disks
self.partitions = partitions
@@ -341,7 +341,7 @@ class ExternalMerger(Merger):
self.pdata[i].clear()
self.spills += 1
- gc.collect() # release the memory as much as possible
+ gc.collect() # release the memory as much as possible
def iteritems(self):
""" Return all merged items as iterator """
@@ -370,8 +370,8 @@ class ExternalMerger(Merger):
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > hard_limit):
- self.data.clear() # will read from disk again
- gc.collect() # release the memory as much as possible
+ self.data.clear() # will read from disk again
+ gc.collect() # release the memory as much as possible
for v in self._recursive_merged_items(i):
yield v
return
@@ -409,9 +409,9 @@ class ExternalMerger(Merger):
for i in range(start, self.partitions):
subdirs = [os.path.join(d, "parts", str(i))
- for d in self.localdirs]
+ for d in self.localdirs]
m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
- subdirs, self.scale * self.partitions)
+ subdirs, self.scale * self.partitions)
m.pdata = [{} for _ in range(self.partitions)]
limit = self._next_limit()
@@ -419,7 +419,7 @@ class ExternalMerger(Merger):
path = self._get_spill_dir(j)
p = os.path.join(path, str(i))
m._partitioned_mergeCombiners(
- self.serializer.load_stream(open(p)))
+ self.serializer.load_stream(open(p)))
if get_used_memory() > limit:
m._spill()