aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGavin Li <lyo.gavin@gmail.com>2013-06-16 22:32:55 +0000
committerGavin Li <lyo.gavin@gmail.com>2013-06-16 22:32:55 +0000
commitfb6d733fa88aa124deecf155af40cc095ecca5b3 (patch)
tree33fa219e31a107bbad8e4df59d1438ce8896728a
parente179ff8a32fc08cc308dc99bac2527d350d0d970 (diff)
downloadspark-fb6d733fa88aa124deecf155af40cc095ecca5b3.tar.gz
spark-fb6d733fa88aa124deecf155af40cc095ecca5b3.tar.bz2
spark-fb6d733fa88aa124deecf155af40cc095ecca5b3.zip
update according to comments
-rw-r--r--core/src/main/scala/spark/RDD.scala71
-rw-r--r--core/src/main/scala/spark/rdd/PipedRDD.scala29
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala13
3 files changed, 24 insertions, 89 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index a1c9604324..152f7be9bb 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -355,68 +355,6 @@ abstract class RDD[T: ClassManifest](
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
- /**
- * Return an RDD created by piping elements to a forked external process.
- * How each record in RDD is outputed to the process can be controled by providing a
- * function trasnform(T, outputFunction: String => Unit). transform() will be called with
- * the currnet record in RDD as the 1st parameter, and the function to output the record to
- * the external process (like out.println()) as the 2nd parameter.
- * Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
- * instead of constructing a huge String to concat all the records:
- * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
- * pipeContext can be used to transfer additional context data to the external process
- * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
- * external process with "^A" as the delimiter in the end of context data. Delimiter can also
- * be customized by the last parameter delimiter.
- */
- def pipe[U<: Seq[String]](
- command: String,
- env: Map[String, String],
- transform: (T,String => Unit) => Any,
- pipeContext: Broadcast[U],
- delimiter: String): RDD[String] =
- new PipedRDD(this, command, env, transform, pipeContext, delimiter)
-
- /**
- * Return an RDD created by piping elements to a forked external process.
- * How each record in RDD is outputed to the process can be controled by providing a
- * function trasnform(T, outputFunction: String => Unit). transform() will be called with
- * the currnet record in RDD as the 1st parameter, and the function to output the record to
- * the external process (like out.println()) as the 2nd parameter.
- * Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
- * instead of constructing a huge String to concat all the records:
- * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
- * pipeContext can be used to transfer additional context data to the external process
- * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
- * external process with "^A" as the delimiter in the end of context data. Delimiter can also
- * be customized by the last parameter delimiter.
- */
- def pipe[U<: Seq[String]](
- command: String,
- transform: (T,String => Unit) => Any,
- pipeContext: Broadcast[U]): RDD[String] =
- new PipedRDD(this, command, Map[String, String](), transform, pipeContext, "\u0001")
-
- /**
- * Return an RDD created by piping elements to a forked external process.
- * How each record in RDD is outputed to the process can be controled by providing a
- * function trasnform(T, outputFunction: String => Unit). transform() will be called with
- * the currnet record in RDD as the 1st parameter, and the function to output the record to
- * the external process (like out.println()) as the 2nd parameter.
- * Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
- * instead of constructing a huge String to concat all the records:
- * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
- * pipeContext can be used to transfer additional context data to the external process
- * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
- * external process with "^A" as the delimiter in the end of context data. Delimiter can also
- * be customized by the last parameter delimiter.
- */
- def pipe[U<: Seq[String]](
- command: String,
- env: Map[String, String],
- transform: (T,String => Unit) => Any,
- pipeContext: Broadcast[U]): RDD[String] =
- new PipedRDD(this, command, env, transform, pipeContext, "\u0001")
/**
* Return an RDD created by piping elements to a forked external process.
@@ -432,13 +370,12 @@ abstract class RDD[T: ClassManifest](
* external process with "^A" as the delimiter in the end of context data. Delimiter can also
* be customized by the last parameter delimiter.
*/
- def pipe[U<: Seq[String]](
+ def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
- transform: (T,String => Unit) => Any = null,
- pipeContext: Broadcast[U] = null,
- delimiter: String = "\u0001"): RDD[String] =
- new PipedRDD(this, command, env, transform, pipeContext, delimiter)
+ printPipeContext: (String => Unit) => Unit = null,
+ printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
+ new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, printRDDElement)
/**
* Return a new RDD by applying a function to each partition of this RDD.
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index d58aaae709..b2c07891ab 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -16,14 +16,12 @@ import spark.broadcast.Broadcast
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
-class PipedRDD[T: ClassManifest, U <: Seq[String]](
+class PipedRDD[T: ClassManifest](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String],
- transform: (T, String => Unit) => Any,
- pipeContext: Broadcast[U],
- delimiter: String
- )
+ printPipeContext: (String => Unit) => Unit,
+ printRDDElement: (T, String => Unit) => Unit)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -32,10 +30,9 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]](
prev: RDD[T],
command: String,
envVars: Map[String, String] = Map(),
- transform: (T, String => Unit) => Any = null,
- pipeContext: Broadcast[U] = null,
- delimiter: String = "\u0001") =
- this(prev, PipedRDD.tokenize(command), envVars, transform, pipeContext, delimiter)
+ printPipeContext: (String => Unit) => Unit = null,
+ printRDDElement: (T, String => Unit) => Unit = null) =
+ this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement)
override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -64,17 +61,13 @@ class PipedRDD[T: ClassManifest, U <: Seq[String]](
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
- // input the pipeContext firstly
- if ( pipeContext != null) {
- for (elem <- pipeContext.value) {
- out.println(elem)
- }
- // delimiter\n as the marker of the end of the pipeContext
- out.println(delimiter)
+ // input the pipe context firstly
+ if ( printPipeContext != null) {
+ printPipeContext(out.println(_))
}
for (elem <- firstParent[T].iterator(split, context)) {
- if (transform != null) {
- transform(elem, out.println(_))
+ if (printRDDElement != null) {
+ printRDDElement(elem, out.println(_))
} else {
out.println(elem)
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index d2852867de..ed075f93ec 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -22,9 +22,12 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val bl = sc.broadcast(List("0"))
- val piped = nums.pipe(Seq("cat"), Map[String, String](),
- (i:Int, f: String=> Unit) => f(i + "_"), sc.broadcast(List("0")))
+ val piped = nums.pipe(Seq("cat"),
+ Map[String, String](),
+ (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
+ (i:Int, f: String=> Unit) => f(i + "_"))
val c = piped.collect()
@@ -40,8 +43,10 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
- pipe(Seq("cat"), Map[String, String](), (i:Tuple2[String, Seq[String]], f: String=> Unit) =>
- {for (e <- i._2){ f(e + "_")}}, sc.broadcast(List("0"))).collect()
+ pipe(Seq("cat"),
+ Map[String, String](),
+ (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
+ (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
assert(d.size === 8)
assert(d(0) === "0")
assert(d(1) === "\u0001")