aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
commit8529ced35c6b77a384d10a26b654a8073d57e03d (patch)
treea657574e398bf1c0b83cde30cc72b8bbdf6b3faf /streaming
parent2f75a4a30e1a3fdf384475b9660c6c43f093f68c (diff)
downloadspark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.gz
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.bz2
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.zip
SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup
JIRA: https://issues.apache.org/jira/browse/SPARK-2657 Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers. There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here. Author: Matei Zaharia <matei@databricks.com> Closes #1555 from mateiz/compact-groupby and squashes the following commits: 845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8 07621a7 [Matei Zaharia] Review comments 0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply bdc8a39 [Matei Zaharia] Small tweak to +=, and typos f61f040 [Matei Zaharia] Fix line lengths 59da88b0 [Matei Zaharia] Fix line lengths 197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient 775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers 9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey 10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 40da313189..1a47089e51 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
val numOldValues = oldRDDs.size
val numNewValues = newRDDs.size
- val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
- if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+ val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
+ if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
throw new Exception("Unexpected number of sequences of reduced values")
}
// Getting reduced values "old time steps" that will be removed from current window
- val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+ val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
// Getting reduced values "new time steps"
val newValues =
- (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+ (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
- if (seqOfValues(0).isEmpty) {
+ if (arrayOfValues(0).isEmpty) {
// If previous window's reduce value does not exist, then at least new values should exist
if (newValues.isEmpty) {
throw new Exception("Neither previous window has value for key, nor new values found. " +
@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
newValues.reduce(reduceF) // return
} else {
// Get the previous window's reduced value
- var tempValue = seqOfValues(0).head
+ var tempValue = arrayOfValues(0).head
// If old values exists, then inverse reduce then from previous value
if (!oldValues.isEmpty) {
tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
}
}
- val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+ val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
+ .mapValues(mergeValues)
if (filterFunc.isDefined) {
Some(mergedValuesRDD.filter(filterFunc.get))