diff options
author | Gavin Li <lyo.gavin@gmail.com> | 2013-06-16 22:32:55 +0000 |
---|---|---|
committer | Gavin Li <lyo.gavin@gmail.com> | 2013-06-16 22:32:55 +0000 |
commit | fb6d733fa88aa124deecf155af40cc095ecca5b3 (patch) | |
tree | 33fa219e31a107bbad8e4df59d1438ce8896728a | |
parent | e179ff8a32fc08cc308dc99bac2527d350d0d970 (diff) | |
download | spark-fb6d733fa88aa124deecf155af40cc095ecca5b3.tar.gz spark-fb6d733fa88aa124deecf155af40cc095ecca5b3.tar.bz2 spark-fb6d733fa88aa124deecf155af40cc095ecca5b3.zip |
update according to comments
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 71 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/PipedRDD.scala | 29 | ||||
-rw-r--r-- | core/src/test/scala/spark/PipedRDDSuite.scala | 13 |
3 files changed, 24 insertions, 89 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index a1c9604324..152f7be9bb 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -355,68 +355,6 @@ abstract class RDD[T: ClassManifest]( def pipe(command: String, env: Map[String, String]): RDD[String] = new PipedRDD(this, command, env) - /** - * Return an RDD created by piping elements to a forked external process. - * How each record in RDD is outputed to the process can be controled by providing a - * function trasnform(T, outputFunction: String => Unit). transform() will be called with - * the currnet record in RDD as the 1st parameter, and the function to output the record to - * the external process (like out.println()) as the 2nd parameter. - * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, - * instead of constructing a huge String to concat all the records: - * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} - * pipeContext can be used to transfer additional context data to the external process - * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to - * external process with "^A" as the delimiter in the end of context data. Delimiter can also - * be customized by the last parameter delimiter. - */ - def pipe[U<: Seq[String]]( - command: String, - env: Map[String, String], - transform: (T,String => Unit) => Any, - pipeContext: Broadcast[U], - delimiter: String): RDD[String] = - new PipedRDD(this, command, env, transform, pipeContext, delimiter) - - /** - * Return an RDD created by piping elements to a forked external process. - * How each record in RDD is outputed to the process can be controled by providing a - * function trasnform(T, outputFunction: String => Unit). transform() will be called with - * the currnet record in RDD as the 1st parameter, and the function to output the record to - * the external process (like out.println()) as the 2nd parameter. - * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, - * instead of constructing a huge String to concat all the records: - * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} - * pipeContext can be used to transfer additional context data to the external process - * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to - * external process with "^A" as the delimiter in the end of context data. Delimiter can also - * be customized by the last parameter delimiter. - */ - def pipe[U<: Seq[String]]( - command: String, - transform: (T,String => Unit) => Any, - pipeContext: Broadcast[U]): RDD[String] = - new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001") - - /** - * Return an RDD created by piping elements to a forked external process. - * How each record in RDD is outputed to the process can be controled by providing a - * function trasnform(T, outputFunction: String => Unit). transform() will be called with - * the currnet record in RDD as the 1st parameter, and the function to output the record to - * the external process (like out.println()) as the 2nd parameter. - * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, - * instead of constructing a huge String to concat all the records: - * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} - * pipeContext can be used to transfer additional context data to the external process - * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to - * external process with "^A" as the delimiter in the end of context data. Delimiter can also - * be customized by the last parameter delimiter. - */ - def pipe[U<: Seq[String]]( - command: String, - env: Map[String, String], - transform: (T,String => Unit) => Any, - pipeContext: Broadcast[U]): RDD[String] = - new PipedRDD(this, command, env, transform, pipeContext, "\u0001") /** * Return an RDD created by piping elements to a forked external process. @@ -432,13 +370,12 @@ abstract class RDD[T: ClassManifest]( * external process with "^A" as the delimiter in the end of context data. Delimiter can also * be customized by the last parameter delimiter. */ - def pipe[U<: Seq[String]]( + def pipe( command: Seq[String], env: Map[String, String] = Map(), - transform: (T,String => Unit) => Any = null, - pipeContext: Broadcast[U] = null, - delimiter: String = "\u0001"): RDD[String] = - new PipedRDD(this, command, env, transform, pipeContext, delimiter) + printPipeContext: (String => Unit) => Unit = null, + printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = + new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, printRDDElement) /** * Return a new RDD by applying a function to each partition of this RDD. diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index d58aaae709..b2c07891ab 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -16,14 +16,12 @@ import spark.broadcast.Broadcast * An RDD that pipes the contents of each parent partition through an external command * (printing them one per line) and returns the output as a collection of strings. */ -class PipedRDD[T: ClassManifest, U <: Seq[String]]( +class PipedRDD[T: ClassManifest]( prev: RDD[T], command: Seq[String], envVars: Map[String, String], - transform: (T, String => Unit) => Any, - pipeContext: Broadcast[U], - delimiter: String - ) + printPipeContext: (String => Unit) => Unit, + printRDDElement: (T, String => Unit) => Unit) extends RDD[String](prev) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -32,10 +30,9 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]]( prev: RDD[T], command: String, envVars: Map[String, String] = Map(), - transform: (T, String => Unit) => Any = null, - pipeContext: Broadcast[U] = null, - delimiter: String = "\u0001") = - this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter) + printPipeContext: (String => Unit) => Unit = null, + printRDDElement: (T, String => Unit) => Unit = null) = + this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement) override def getPartitions: Array[Partition] = firstParent[T].partitions @@ -64,17 +61,13 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]]( SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) - // input the pipeContext firstly - if ( pipeContext != null) { - for (elem <- pipeContext.value) { - out.println(elem) - } - // delimiter\n as the marker of the end of the pipeContext - out.println(delimiter) + // input the pipe context firstly + if ( printPipeContext != null) { + printPipeContext(out.println(_)) } for (elem <- firstParent[T].iterator(split, context)) { - if (transform != null) { - transform(elem, out.println(_)) + if (printRDDElement != null) { + printRDDElement(elem, out.println(_)) } else { out.println(elem) } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index d2852867de..ed075f93ec 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -22,9 +22,12 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { test("advanced pipe") { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val bl = sc.broadcast(List("0")) - val piped = nums.pipe(Seq("cat"), Map[String, String](), - (i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0"))) + val piped = nums.pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, + (i:Int, f: String=> Unit) => f(i + "_")) val c = piped.collect() @@ -40,8 +43,10 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) val d = nums1.groupBy(str=>str.split("\t")(0)). - pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) => - {for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect() + pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, + (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect() assert(d.size === 8) assert(d(0) === "0") assert(d(1) === "\u0001") |