aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-09-19 21:40:21 -0700
committerReynold Xin <rxin@databricks.com>2015-09-19 21:40:21 -0700
commit2117eea71ece825fbc3797c8b38184ae221f5223 (patch)
tree06481ef1968367118e89779335e24245f57f2017 /python
parente789000b88a6bd840f821c53f42c08b97dc02496 (diff)
downloadspark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.gz
spark-2117eea71ece825fbc3797c8b38184ae221f5223.tar.bz2
spark-2117eea71ece825fbc3797c8b38184ae221f5223.zip
[SPARK-10710] Remove ability to disable spilling in core and SQL
It does not make much sense to set `spark.shuffle.spill` or `spark.sql.planner.externalSort` to false: I believe that these configurations were initially added as "escape hatches" to guard against bugs in the external operators, but these operators are now mature and well-tested. In addition, these configurations are not handled in a consistent way anymore: SQL's Tungsten codepath ignores these configurations and will continue to use spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager does not respect `spark.shuffle.spill=false`. This pull request removes these configurations, adds warnings at the appropriate places, and deletes a large amount of code which was only used in code paths that did not support spilling. Author: Josh Rosen <joshrosen@databricks.com> Closes #8831 from JoshRosen/remove-ability-to-disable-spilling.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py25
-rw-r--r--python/pyspark/shuffle.py30
-rw-r--r--python/pyspark/tests.py13
3 files changed, 8 insertions, 60 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ab5aab1e11..73d7d9a569 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -48,7 +48,7 @@ from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
+from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync
@@ -580,12 +580,11 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == "true")
memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
serializer = self._jrdd_deserializer
def sortPartition(iterator):
- sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ sort = ExternalSorter(memory * 0.9, serializer).sorted
return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
@@ -610,12 +609,11 @@ class RDD(object):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
- spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer
def sortPartition(iterator):
- sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
+ sort = ExternalSorter(memory * 0.9, serializer).sorted
return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
if numPartitions == 1:
@@ -1770,13 +1768,11 @@ class RDD(object):
numPartitions = self._defaultReducePartitions()
serializer = self.ctx.serializer
- spill = self._can_spill()
memory = self._memory_limit()
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combineLocally(iterator):
- merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
@@ -1784,8 +1780,7 @@ class RDD(object):
shuffled = locally_combined.partitionBy(numPartitions)
def _mergeCombiners(iterator):
- merger = ExternalMerger(agg, memory, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory, serializer)
merger.mergeCombiners(iterator)
return merger.items()
@@ -1824,9 +1819,6 @@ class RDD(object):
return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
- def _can_spill(self):
- return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"
-
def _memory_limit(self):
return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
@@ -1857,14 +1849,12 @@ class RDD(object):
a.extend(b)
return a
- spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combine(iterator):
- merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ merger = ExternalMerger(agg, memory * 0.9, serializer)
merger.mergeValues(iterator)
return merger.items()
@@ -1872,8 +1862,7 @@ class RDD(object):
shuffled = locally_combined.partitionBy(numPartitions)
def groupByKey(it):
- merger = ExternalGroupBy(agg, memory, serializer)\
- if spill else InMemoryMerger(agg)
+ merger = ExternalGroupBy(agg, memory, serializer)
merger.mergeCombiners(it)
return merger.items()
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b8118bdb7c..e974cda9fc 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -131,36 +131,6 @@ class Merger(object):
raise NotImplementedError
-class InMemoryMerger(Merger):
-
- """
- In memory merger based on in-memory dict.
- """
-
- def __init__(self, aggregator):
- Merger.__init__(self, aggregator)
- self.data = {}
-
- def mergeValues(self, iterator):
- """ Combine the items by creator and combiner """
- # speed up attributes lookup
- d, creator = self.data, self.agg.createCombiner
- comb = self.agg.mergeValue
- for k, v in iterator:
- d[k] = comb(d[k], v) if k in d else creator(v)
-
- def mergeCombiners(self, iterator):
- """ Merge the combined items by mergeCombiner """
- # speed up attributes lookup
- d, comb = self.data, self.agg.mergeCombiners
- for k, v in iterator:
- d[k] = comb(d[k], v) if k in d else v
-
- def items(self):
- """ Return the merged items ad iterator """
- return iter(self.data.items())
-
-
def _compressed_serializer(self, serializer=None):
# always use PickleSerializer to simplify implementation
ser = PickleSerializer()
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 647504c32f..f11aaf001c 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,7 +62,7 @@ from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer,
CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, NoOpSerializer, \
PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, AutoSerializer, \
FlattenedValuesSerializer
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter
+from pyspark.shuffle import Aggregator, ExternalMerger, ExternalSorter
from pyspark import shuffle
from pyspark.profiler import BasicProfiler
@@ -95,17 +95,6 @@ class MergerTests(unittest.TestCase):
lambda x, y: x.append(y) or x,
lambda x, y: x.extend(y) or x)
- def test_in_memory(self):
- m = InMemoryMerger(self.agg)
- m.mergeValues(self.data)
- self.assertEqual(sum(sum(v) for k, v in m.items()),
- sum(xrange(self.N)))
-
- m = InMemoryMerger(self.agg)
- m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data))
- self.assertEqual(sum(sum(v) for k, v in m.items()),
- sum(xrange(self.N)))
-
def test_small_dataset(self):
m = ExternalMerger(self.agg, 1000)
m.mergeValues(self.data)