aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorGavin Li <lyo.gavin@gmail.com>2013-06-02 23:21:09 +0000
committerGavin Li <lyo.gavin@gmail.com>2013-06-02 23:21:09 +0000
commit4a9913d66a61ac9ef9cab0e08f6151dc2624fd11 (patch)
tree3f47ce1ae8da633313eda55c707bbecfd5aa8938 /core/src/test/scala
parent9f84315c055d7a53da8787eb26b336726fc33e8a (diff)
downloadspark-4a9913d66a61ac9ef9cab0e08f6151dc2624fd11.tar.gz
spark-4a9913d66a61ac9ef9cab0e08f6151dc2624fd11.tar.bz2
spark-4a9913d66a61ac9ef9cab0e08f6151dc2624fd11.zip
add ut for pipe enhancement
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala31
1 files changed, 31 insertions, 0 deletions
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index a6344edf8f..ee55952a94 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -19,6 +19,37 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
assert(c(3) === "4")
}
+ test("advanced pipe") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+ val piped = nums.pipe(Seq("cat"), (i:Int, f: String=> Unit) => f(i + "_"), Array("0"))
+
+ val c = piped.collect()
+
+ assert(c.size === 8)
+ assert(c(0) === "0")
+ assert(c(1) === "\u0001")
+ assert(c(2) === "1_")
+ assert(c(3) === "2_")
+ assert(c(4) === "0")
+ assert(c(5) === "\u0001")
+ assert(c(6) === "3_")
+ assert(c(7) === "4_")
+
+ val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+ val d = nums1.groupBy(str=>str.split("\t")(0)).pipe(Seq("cat"), (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}, Array("0")).collect()
+ assert(d.size === 8)
+ assert(d(0) === "0")
+ assert(d(1) === "\u0001")
+ assert(d(2) === "b\t2_")
+ assert(d(3) === "b\t4_")
+ assert(d(4) === "0")
+ assert(d(5) === "\u0001")
+ assert(d(6) === "a\t1_")
+ assert(d(7) === "a\t3_")
+ }
+
test("pipe with env variable") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)