aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-25 00:32:32 -0700
commit8529ced35c6b77a384d10a26b654a8073d57e03d (patch)
treea657574e398bf1c0b83cde30cc72b8bbdf6b3faf /mllib/src
parent2f75a4a30e1a3fdf384475b9660c6c43f093f68c (diff)
downloadspark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.gz
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.tar.bz2
spark-8529ced35c6b77a384d10a26b654a8073d57e03d.zip
SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup
JIRA: https://issues.apache.org/jira/browse/SPARK-2657 Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers. There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here. Author: Matei Zaharia <matei@databricks.com> Closes #1555 from mateiz/compact-groupby and squashes the following commits: 845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8 07621a7 [Matei Zaharia] Review comments 0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply bdc8a39 [Matei Zaharia] Small tweak to +=, and typos f61f040 [Matei Zaharia] Fix line lengths 59da88b0 [Matei Zaharia] Fix line lengths 197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient 775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers 9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey 10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 88de2c8247..1f7de630e7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -122,6 +122,10 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
val partitioner = new HashPartitioner(input.partitions.size)
val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
- cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) }
+ cogrouped.map {
+ case (_, values: Array[Iterable[_]]) =>
+ val doubles = values.asInstanceOf[Array[Iterable[Double]]]
+ new DenseVector(doubles.flatten.toArray)
+ }
}
}