aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-02-12 00:42:42 -0800
committerReynold Xin <rxin@apache.org>2014-02-12 00:42:42 -0800
commite733d655df6bf569d3d16fdd65c11ef3d2b9de16 (patch)
tree408290ad8f015dcb64a48495bd6d8fbe5730f821 /core/src/test
parent68b2c0d02dbdca246ca686b871c06af53845d5b5 (diff)
downloadspark-e733d655df6bf569d3d16fdd65c11ef3d2b9de16.tar.gz
spark-e733d655df6bf569d3d16fdd65c11ef3d2b9de16.tar.bz2
spark-e733d655df6bf569d3d16fdd65c11ef3d2b9de16.zip
Merge pull request #578 from mengxr/rank.
SPARK-1076: zipWithIndex and zipWithUniqueId to RDD Assign ranks to an ordered or unordered data set is a common operation. This could be done by first counting records in each partition and then assign ranks in parallel. The purpose of assigning ranks to an unordered set is usually to get a unique id for each item, e.g., to map feature names to feature indices. In such cases, the assignment could be done without counting records, saving one spark job. https://spark-project.atlassian.net/browse/SPARK-1076 == update == Because assigning ranks is very similar to Scala's zipWithIndex, I changed the method name to zipWithIndex and put the index in the value field. Author: Xiangrui Meng <meng@databricks.com> Closes #578 and squashes the following commits: 52a05e1 [Xiangrui Meng] changed assignRanks to zipWithIndex changed assignUniqueIds to zipWithUniqueId minor updates 756881c [Xiangrui Meng] simplified RankedRDD by implementing assignUniqueIds separately moved couting iterator size to Utils do not count items in the last partition and skip counting if there is only one partition 630868c [Xiangrui Meng] newline 21b434b [Xiangrui Meng] add assignRanks and assignUniqueIds to RDD
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala7
2 files changed, 33 insertions, 0 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 879c4e5f17..308c7cc8c3 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -525,4 +525,30 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(a.intersection(b).collect.sorted === intersection)
assert(b.intersection(a).collect.sorted === intersection)
}
+
+ test("zipWithIndex") {
+ val n = 10
+ val data = sc.parallelize(0 until n, 3)
+ val ranked = data.zipWithIndex()
+ ranked.collect().foreach { x =>
+ assert(x._1 === x._2)
+ }
+ }
+
+ test("zipWithIndex with a single partition") {
+ val n = 10
+ val data = sc.parallelize(0 until n, 1)
+ val ranked = data.zipWithIndex()
+ ranked.collect().foreach { x =>
+ assert(x._1 === x._2)
+ }
+ }
+
+ test("zipWithUniqueId") {
+ val n = 10
+ val data = sc.parallelize(0 until n, 3)
+ val ranked = data.zipWithUniqueId()
+ val ids = ranked.map(_._1).distinct().collect()
+ assert(ids.length === n)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 4684c8c972..7030ba4858 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -146,5 +146,12 @@ class UtilsSuite extends FunSuite {
assert(bbuf.array.length === 8)
assert(Utils.deserializeLongValue(bbuf.array) === testval)
}
+
+ test("get iterator size") {
+ val empty = Seq[Int]()
+ assert(Utils.getIteratorSize(empty.toIterator) === 0L)
+ val iterator = Iterator.range(0, 5)
+ assert(Utils.getIteratorSize(iterator) === 5L)
+ }
}