aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGavin Li <lyo.gavin@gmail.com>2013-06-17 05:23:46 +0000
committerGavin Li <lyo.gavin@gmail.com>2013-06-17 05:23:46 +0000
commit4508089fc342802a2f37fea6893cd47abd81fdd7 (patch)
tree03457dd84de50b1cbc85d4a391d28fca55a65f85
parent728665d34b2722740cc4989aff5279ae702d632e (diff)
downloadspark-4508089fc342802a2f37fea6893cd47abd81fdd7.tar.gz
spark-4508089fc342802a2f37fea6893cd47abd81fdd7.tar.bz2
spark-4508089fc342802a2f37fea6893cd47abd81fdd7.zip
refine comments and add sc.clean
-rw-r--r--core/src/main/scala/spark/RDD.scala30
1 files changed, 18 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 05ff399a7b..223dcdc19d 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -361,24 +361,30 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD created by piping elements to a forked external process.
- * How each record in RDD is outputed to the process can be controled by providing a
- * function trasnform(T, outputFunction: String => Unit). transform() will be called with
- * the currnet record in RDD as the 1st parameter, and the function to output the record to
- * the external process (like out.println()) as the 2nd parameter.
- * Here's an example on how to pipe the RDD data of groupBy() in a streaming way,
- * instead of constructing a huge String to concat all the records:
- * def tranform(record:(String, Seq[String]), f:String=>Unit) = for (e <- record._2){f(e)}
- * pipeContext can be used to transfer additional context data to the external process
- * besides the RDD. pipeContext is a broadcast Seq[String], each line would be piped to
- * external process with "^A" as the delimiter in the end of context data. Delimiter can also
- * be customized by the last parameter delimiter.
+ * The print behavior can be customized by providing two functions.
+ *
+ * @param command command to run in forked process.
+ * @param env environment variables to set.
+ * @param printPipeContext Before piping elements, this function is called as an oppotunity
+ * to pipe context data. Print line function (like out.println) will be
+ * passed as printPipeContext's parameter.
+ * @param printPipeContext Use this function to customize how to pipe elements. This function
+ * will be called with each RDD element as the 1st parameter, and the
+ * print line function (like out.println()) as the 2nd parameter.
+ * An example of pipe the RDD data of groupBy() in a streaming way,
+ * instead of constructing a huge String to concat all the elements:
+ * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
+ * for (e <- record._2){f(e)}
+ * @return the result RDD
*/
def pipe(
command: Seq[String],
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
- new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, printRDDElement)
+ new PipedRDD(this, command, env,
+ if (printPipeContext ne null) sc.clean(printPipeContext) else null,
+ if (printRDDElement ne null) sc.clean(printRDDElement) else null)
/**
* Return a new RDD by applying a function to each partition of this RDD.