aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/shuffle.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/shuffle.py')
-rw-r--r--python/pyspark/shuffle.py20
1 files changed, 10 insertions, 10 deletions
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index e3923d1c36..2c68cd4921 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -45,7 +45,7 @@ except ImportError:
return int(line.split()[1]) >> 10
else:
warnings.warn("Please install psutil to have better "
- "support with spilling")
+ "support with spilling")
if platform.system() == "Darwin":
import resource
rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
@@ -141,7 +141,7 @@ class ExternalMerger(Merger):
This class works as follows:
- - It repeatedly combine the items and save them in one dict in
+ - It repeatedly combine the items and save them in one dict in
memory.
- When the used memory goes above memory limit, it will split
@@ -190,12 +190,12 @@ class ExternalMerger(Merger):
MAX_TOTAL_PARTITIONS = 4096
def __init__(self, aggregator, memory_limit=512, serializer=None,
- localdirs=None, scale=1, partitions=59, batch=1000):
+ localdirs=None, scale=1, partitions=59, batch=1000):
Merger.__init__(self, aggregator)
self.memory_limit = memory_limit
# default serializer is only used for tests
self.serializer = serializer or \
- BatchedSerializer(PickleSerializer(), 1024)
+ BatchedSerializer(PickleSerializer(), 1024)
self.localdirs = localdirs or self._get_dirs()
# number of partitions when spill data into disks
self.partitions = partitions
@@ -341,7 +341,7 @@ class ExternalMerger(Merger):
self.pdata[i].clear()
self.spills += 1
- gc.collect() # release the memory as much as possible
+ gc.collect() # release the memory as much as possible
def iteritems(self):
""" Return all merged items as iterator """
@@ -370,8 +370,8 @@ class ExternalMerger(Merger):
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
and j < self.spills - 1
and get_used_memory() > hard_limit):
- self.data.clear() # will read from disk again
- gc.collect() # release the memory as much as possible
+ self.data.clear() # will read from disk again
+ gc.collect() # release the memory as much as possible
for v in self._recursive_merged_items(i):
yield v
return
@@ -409,9 +409,9 @@ class ExternalMerger(Merger):
for i in range(start, self.partitions):
subdirs = [os.path.join(d, "parts", str(i))
- for d in self.localdirs]
+ for d in self.localdirs]
m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
- subdirs, self.scale * self.partitions)
+ subdirs, self.scale * self.partitions)
m.pdata = [{} for _ in range(self.partitions)]
limit = self._next_limit()
@@ -419,7 +419,7 @@ class ExternalMerger(Merger):
path = self._get_spill_dir(j)
p = os.path.join(path, str(i))
m._partitioned_mergeCombiners(
- self.serializer.load_stream(open(p)))
+ self.serializer.load_stream(open(p)))
if get_used_memory() > limit:
m._spill()