diff options
author | Holden Karau <holden@pigscanfly.ca> | 2014-04-08 18:15:52 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-08 18:15:59 -0700 |
commit | ce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch) | |
tree | 029a7ba0926eb1a8384ba73e74fc0bb018121528 /mllib/src | |
parent | 12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff) | |
download | spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2 spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip |
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca>
Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits:
f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator
77048f8 [Holden Karau] Fix merge up to master
d3fe909 [Holden Karau] use toSeq instead
7a092a3 [Holden Karau] switch resultitr to resultiterable
eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables
c5075aa [Holden Karau] If guava 14 had iterables
2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API
11e730c [Holden Karau] Fix streaming tests
66b583d [Holden Karau] Fix the core test suite to compile
4ed579b [Holden Karau] Refactor from iterator to iterable
d052c07 [Holden Karau] Python tests now pass with iterator pandas
3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work"
cd1e81c [Holden Karau] Try and make pickling list iterators work
c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well
88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming
a5ee714 [Holden Karau] oops, was checking wrong iterator
e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming
ec8cc3e [Holden Karau] Fix test issues\!
4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions
fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD"
ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas"
b692868 [Holden Karau] Revert
7e533f7 [Holden Karau] Fix the bug
8a5153a [Holden Karau] Revert me, but we have some stuff to debug
b4e86a9 [Holden Karau] Add a join based on the problem in SVD
c4510e2 [Holden Karau] Revert this but for now put things in list pandas
b4e0b1d [Holden Karau] Fix style issues
71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness.
b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work
37888ec [Holden Karau] core/tests now pass
249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes
6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy"
fe992fe [Holden Karau] hmmm try and fix up basic operation suite
172705c [Holden Karau] Fix Java API suite
caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy
88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator
4991af6 [Holden Karau] Fix some tests
be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after
687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'mllib/src')
3 files changed, 8 insertions, 8 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala index 3e7cc648d1..0d97b7d92f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala @@ -69,11 +69,11 @@ class SVD { /** * Compute SVD using the current set parameters - * Returns (U, S, V) such that A = USV^T + * Returns (U, S, V) such that A = USV^T * U is a row-by-row dense matrix * S is a simple double array of singular values * V is a 2d array matrix - * See [[denseSVD]] for more documentation + * See [[denseSVD]] for more documentation */ def compute(matrix: RDD[Array[Double]]): (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = { @@ -393,5 +393,3 @@ object SVD { System.exit(0) } } - - diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 0cc9f48769..3124fac326 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -421,12 +421,12 @@ class ALS private ( * Compute the new feature vectors for a block of the users matrix given the list of factors * it received from each product and its InLinkBlock. */ - private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, + private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock, rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]]) : Array[Array[Double]] = { // Sort the incoming block factor messages by block ID and make them an array - val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] + val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]] val numBlocks = blockFactors.length val numUsers = inLinkBlock.elementIds.length diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala index afe081295b..87aac34757 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala @@ -38,8 +38,10 @@ object LAUtils { case (i, cols) => val rowArray = Array.ofDim[Double](n) var j = 0 - while (j < cols.size) { - rowArray(cols(j)._1) = cols(j)._2 + val colsItr = cols.iterator + while (colsItr.hasNext) { + val element = colsItr.next + rowArray(element._1) = element._2 j += 1 } MatrixRow(i, rowArray) |