diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-10 16:25:01 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-10 16:25:01 -0800 |
commit | d37408f39ca3fd94f45b50a65f919f4d7007a533 (patch) | |
tree | 156e7f6639c22f919a932db2a9b90e803d26c94d /streaming | |
parent | 0eaf01c5ed856c9aeb60c0841c3be9305c6da174 (diff) | |
parent | 2e393cd5fdfbf3a85fced370b5c42315e86dad49 (diff) | |
download | spark-d37408f39ca3fd94f45b50a65f919f4d7007a533.tar.gz spark-d37408f39ca3fd94f45b50a65f919f4d7007a533.tar.bz2 spark-d37408f39ca3fd94f45b50a65f919f4d7007a533.zip |
Merge pull request #377 from andrewor14/master
External Sorting for Aggregator and CoGroupedRDDs (Revisited)
(This pull request is re-opened from https://github.com/apache/incubator-spark/pull/303, which was closed because Jenkins / github was misbehaving)
The target issue for this patch is the out-of-memory exceptions triggered by aggregate operations such as reduce, groupBy, join, and cogroup. The existing AppendOnlyMap used by these operations resides purely in memory, and grows with the size of the input data until the amount of allocated memory is exceeded. Under large workloads, this problem is aggravated by the fact that OOM frequently occurs only after a very long (> 1 hour) map phase, in which case the entire job must be restarted.
The solution is to spill the contents of this map to disk once a certain memory threshold is exceeded. This functionality is provided by ExternalAppendOnlyMap, which additionally sorts this buffer before writing it out to disk, and later merges these buffers back in sorted order.
Under normal circumstances in which OOM is not triggered, ExternalAppendOnlyMap is simply a wrapper around AppendOnlyMap and incurs little overhead. Only when the memory usage is expected to exceed the given threshold does ExternalAppendOnlyMap spill to disk.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index 0d2145da9a..8b7d7709bf 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -28,6 +28,7 @@ import java.util.*; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.io.Files; +import com.google.common.collect.Sets; import org.apache.spark.SparkConf; import org.apache.spark.HashPartitioner; @@ -441,13 +442,13 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa new Tuple2<String, String>("new york", "islanders"))); - List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( - Arrays.asList( + List<HashSet<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Sets.newHashSet( new Tuple2<String, Tuple2<String, String>>("california", new Tuple2<String, String>("dodgers", "giants")), new Tuple2<String, Tuple2<String, String>>("new york", - new Tuple2<String, String>("yankees", "mets"))), - Arrays.asList( + new Tuple2<String, String>("yankees", "mets"))), + Sets.newHashSet( new Tuple2<String, Tuple2<String, String>>("california", new Tuple2<String, String>("sharks", "ducks")), new Tuple2<String, Tuple2<String, String>>("new york", @@ -482,8 +483,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa JavaTestUtils.attachTestOutputStream(joined); List<List<Tuple2<String, Tuple2<String, String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2); + List<HashSet<Tuple2<String, Tuple2<String, String>>>> unorderedResult = Lists.newArrayList(); + for (List<Tuple2<String, Tuple2<String, String>>> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @@ -1196,15 +1201,15 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa Arrays.asList("hello", "moon"), Arrays.asList("hello")); - List<List<Tuple2<String, Long>>> expected = Arrays.asList( - Arrays.asList( + List<HashSet<Tuple2<String, Long>>> expected = Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 1L), new Tuple2<String, Long>("world", 1L)), - Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 2L), new Tuple2<String, Long>("world", 1L), new Tuple2<String, Long>("moon", 1L)), - Arrays.asList( + Sets.newHashSet( new Tuple2<String, Long>("hello", 2L), new Tuple2<String, Long>("moon", 1L))); @@ -1214,8 +1219,12 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa stream.countByValueAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3); + List<HashSet<Tuple2<String, Long>>> unorderedResult = Lists.newArrayList(); + for (List<Tuple2<String, Long>> res: result) { + unorderedResult.add(Sets.newHashSet(res)); + } - Assert.assertEquals(expected, result); + Assert.assertEquals(expected, unorderedResult); } @Test |