diff options
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 26 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/PipedRDD.scala | 27 | ||||
-rw-r--r-- | core/src/test/scala/spark/PipedRDDSuite.scala | 39 |
3 files changed, 83 insertions, 9 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 7cb9bfebeb..05ff399a7b 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -14,6 +14,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import spark.broadcast.Broadcast import spark.Partitioner._ import spark.partial.BoundedDouble import spark.partial.CountEvaluator @@ -354,13 +355,30 @@ abstract class RDD[T: ClassManifest]( /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) + def pipe(command: String, env: Map[String, String]): RDD[String] = + new PipedRDD(this, command, env) + /** * Return an RDD created by piping elements to a forked external process. - */ - def pipe(command: Seq[String], env: Map[String, String]): RDD[String] = - new PipedRDD(this, command, env) + * How each record in RDD is outputed to the process can be controled by providing a + * function trasnform(T, outputFunction: String => Unit). transform() will be called with + * the currnet record in RDD as the 1st parameter, and the function to output the record to + * the external process (like out.println()) as the 2nd parameter. + * Here's an example on how to pipe the RDD data of groupBy() in a streaming way, + * instead of constructing a huge String to concat all the records: + * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)} + * pipeContext can be used to transfer additional context data to the external process + * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to + * external process with "^A" as the delimiter in the end of context data. Delimiter can also + * be customized by the last parameter delimiter. + */ + def pipe( + command: Seq[String], + env: Map[String, String] = Map(), + printPipeContext: (String => Unit) => Unit = null, + printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = + new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, printRDDElement) /** * Return a new RDD by applying a function to each partition of this RDD. diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala index 962a1b21ad..b2c07891ab 100644 --- a/core/src/main/scala/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/spark/rdd/PipedRDD.scala @@ -9,6 +9,7 @@ import scala.collection.mutable.ArrayBuffer import scala.io.Source import spark.{RDD, SparkEnv, Partition, TaskContext} +import spark.broadcast.Broadcast /** @@ -18,14 +19,21 @@ import spark.{RDD, SparkEnv, Partition, TaskContext} class PipedRDD[T: ClassManifest]( prev: RDD[T], command: Seq[String], - envVars: Map[String, String]) + envVars: Map[String, String], + printPipeContext: (String => Unit) => Unit, + printRDDElement: (T, String => Unit) => Unit) extends RDD[String](prev) { - def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map()) - // Similar to Runtime.exec(), if we are given a single string, split it into words // using a standard StringTokenizer (i.e. by spaces) - def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command)) + def this( + prev: RDD[T], + command: String, + envVars: Map[String, String] = Map(), + printPipeContext: (String => Unit) => Unit = null, + printRDDElement: (T, String => Unit) => Unit = null) = + this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement) + override def getPartitions: Array[Partition] = firstParent[T].partitions @@ -52,8 +60,17 @@ class PipedRDD[T: ClassManifest]( override def run() { SparkEnv.set(env) val out = new PrintWriter(proc.getOutputStream) + + // input the pipe context firstly + if ( printPipeContext != null) { + printPipeContext(out.println(_)) + } for (elem <- firstParent[T].iterator(split, context)) { - out.println(elem) + if (printRDDElement != null) { + printRDDElement(elem, out.println(_)) + } else { + out.println(elem) + } } out.close() } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index a6344edf8f..ed075f93ec 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -19,6 +19,45 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { assert(c(3) === "4") } + test("advanced pipe") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val bl = sc.broadcast(List("0")) + + val piped = nums.pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, + (i:Int, f: String=> Unit) => f(i + "_")) + + val c = piped.collect() + + assert(c.size === 8) + assert(c(0) === "0") + assert(c(1) === "\u0001") + assert(c(2) === "1_") + assert(c(3) === "2_") + assert(c(4) === "0") + assert(c(5) === "\u0001") + assert(c(6) === "3_") + assert(c(7) === "4_") + + val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) + val d = nums1.groupBy(str=>str.split("\t")(0)). + pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, + (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect() + assert(d.size === 8) + assert(d(0) === "0") + assert(d(1) === "\u0001") + assert(d(2) === "b\t2_") + assert(d(3) === "b\t4_") + assert(d(4) === "0") + assert(d(5) === "\u0001") + assert(d(6) === "a\t1_") + assert(d(7) === "a\t3_") + } + test("pipe with env variable") { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) |