aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorShuo Xiang <shuoxiangpub@gmail.com>2015-03-20 14:45:44 -0400
committerXiangrui Meng <meng@databricks.com>2015-03-20 14:45:44 -0400
commit5e6ad24ff645a9b0f63d9c0f17193550963aa0a7 (patch)
tree157de119c6a682bea87fe1a09ba1e6bbecb1cc11 /mllib
parent48866f789712b0cdbaf76054d1014c6df032fff1 (diff)
downloadspark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.tar.gz
spark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.tar.bz2
spark-5e6ad24ff645a9b0f63d9c0f17193550963aa0a7.zip
[MLlib] SPARK-5954: Top by key
This PR implements two functions - `topByKey(num: Int): RDD[(K, Array[V])]` finds the top-k values for each key in a pair RDD. This can be used, e.g., in computing top recommendations. - `takeOrderedByKey(num: Int): RDD[(K, Array[V])] ` does the opposite of `topByKey` The `sorted` is used here as the `toArray` method of the PriorityQueue does not return a necessarily sorted array. Author: Shuo Xiang <shuoxiangpub@gmail.com> Closes #5075 from coderxiang/topByKey and squashes the following commits: 1611c37 [Shuo Xiang] code clean up 6f565c0 [Shuo Xiang] naming a80e0ec [Shuo Xiang] typo and warning 82dded9 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey d202745 [Shuo Xiang] move to MLPairRDDFunctions 901b0af [Shuo Xiang] style check 70c6e35 [Shuo Xiang] remove takeOrderedByKey, update doc and test 0895c17 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey b10e325 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into topByKey debccad [Shuo Xiang] topByKey
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala60
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala36
2 files changed, 96 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
new file mode 100644
index 0000000000..9213fd3f59
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import scala.language.implicitConversions
+import scala.reflect.ClassTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.BoundedPriorityQueue
+
+/**
+ * Machine learning specific Pair RDD functions.
+ */
+@DeveloperApi
+class MLPairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) extends Serializable {
+ /**
+ * Returns the top k (largest) elements for each key from this RDD as defined by the specified
+ * implicit Ordering[T].
+ * If the number of elements for a certain key is less than k, all of them will be returned.
+ *
+ * @param num k, the number of top elements to return
+ * @param ord the implicit ordering for T
+ * @return an RDD that contains the top k values for each key
+ */
+ def topByKey(num: Int)(implicit ord: Ordering[V]): RDD[(K, Array[V])] = {
+ self.aggregateByKey(new BoundedPriorityQueue[V](num)(ord))(
+ seqOp = (queue, item) => {
+ queue += item
+ queue
+ },
+ combOp = (queue1, queue2) => {
+ queue1 ++= queue2
+ queue1
+ }
+ ).mapValues(_.toArray.sorted(ord.reverse))
+ }
+}
+
+@DeveloperApi
+object MLPairRDDFunctions {
+ /** Implicit conversion from a pair RDD to MLPairRDDFunctions. */
+ implicit def fromPairRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): MLPairRDDFunctions[K, V] =
+ new MLPairRDDFunctions[K, V](rdd)
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
new file mode 100644
index 0000000000..1ac7c12c4e
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.rdd
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.rdd.MLPairRDDFunctions._
+
+class MLPairRDDFunctionsSuite extends FunSuite with MLlibTestSparkContext {
+ test("topByKey") {
+ val topMap = sc.parallelize(Array((1, 1), (1, 2), (3, 2), (3, 7), (3, 5), (5, 1), (5, 3)), 2)
+ .topByKey(2)
+ .collectAsMap()
+
+ assert(topMap.size === 3)
+ assert(topMap(1) === Array(2, 1))
+ assert(topMap(3) === Array(7, 5))
+ assert(topMap(5) === Array(3, 1))
+ }
+}