diff options
author | Shuo Xiang <shuoxiangpub@gmail.com> | 2015-03-20 14:45:44 -0400 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-03-20 14:45:44 -0400 |
commit | 5e6ad24ff645a9b0f63d9c0f17193550963aa0a7 (patch) | |
tree | 157de119c6a682bea87fe1a09ba1e6bbecb1cc11 /mllib/src | |
parent | 48866f789712b0cdbaf76054d1014c6df032fff1 (diff) | |
download | spark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.tar.gz spark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.tar.bz2 spark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.zip |
[MLlib] SPARK-5954: Top by key
This PR implements two functions
- `topByKey(num: Int): RDD[(K, Array[V])]` finds the top-k values for each key in a pair RDD. This can be used, e.g., in computing top recommendations.
- `takeOrderedByKey(num: Int): RDD[(K, Array[V])] ` does the opposite of `topByKey`
The `sorted` is used here as the `toArray` method of the PriorityQueue does not return a necessarily sorted array.
Author: Shuo Xiang <shuoxiangpub@gmail.com>
Closes #5075 from coderxiang/topByKey and squashes the following commits:
1611c37 [Shuo Xiang] code clean up
6f565c0 [Shuo Xiang] naming
a80e0ec [Shuo Xiang] typo and warning
82dded9 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey
d202745 [Shuo Xiang] move to MLPairRDDFunctions
901b0af [Shuo Xiang] style check
70c6e35 [Shuo Xiang] remove takeOrderedByKey, update doc and test
0895c17 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey
b10e325 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey
debccad [Shuo Xiang] topByKey
Diffstat (limited to 'mllib/src')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala | 60 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala | 36 |
2 files changed, 96 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala new file mode 100644 index 0000000000..9213fd3f59 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import scala.language.implicitConversions +import scala.reflect.ClassTag + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.util.BoundedPriorityQueue + +/** + * Machine learning specific Pair RDD functions. + */ +@DeveloperApi +class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Serializable { + /** + * Returns the top k (largest) elements for each key from this RDD as defined by the specified + * implicit Ordering[T]. + * If the number of elements for a certain key is less than k, all of them will be returned. + * + * @param num k, the number of top elements to return + * @param ord the implicit ordering for T + * @return an RDD that contains the top k values for each key + */ + def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = { + self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))( + seqOp = (queue, item) => { + queue += item + queue + }, + combOp = (queue1, queue2) => { + queue1 ++= queue2 + queue1 + } + ).mapValues(_.toArray.sorted(ord.reverse)) + } +} + +@DeveloperApi +object MLPairRDDFunctions { + /** Implicit conversion from a pair RDD to MLPairRDDFunctions. */ + implicit def fromPairRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): MLPairRDDFunctions[K, V] = + new MLPairRDDFunctions[K, V](rdd) +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala new file mode 100644 index 0000000000..1ac7c12c4e --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.rdd + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.rdd.MLPairRDDFunctions._ + +class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext { + test("topByKey") { + val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2) + .topByKey(2) + .collectAsMap() + + assert(topMap.size === 3) + assert(topMap(1) === Array(2, 1)) + assert(topMap(3) === Array(7, 5)) + assert(topMap(5) === Array(3, 1)) + } +} |