aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-06-19 23:06:21 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-06-19 23:06:21 -0700
commit23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42 (patch)
tree47dcc2b8b4ecabd808f593e0cd443a61f7d82550 /core
parentc62bb4091bfcec8936b74f703ae8051046ddcfb1 (diff)
parent23b1c309fbbba70999177be319525c197c9fa72b (diff)
downloadspark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.tar.gz
spark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.tar.bz2
spark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.zip
Merge branch 'master' into scala-2.9
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/PipedRDD.scala61
-rw-r--r--core/src/main/scala/spark/RDD.scala6
2 files changed, 67 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala
new file mode 100644
index 0000000000..7c00492986
--- /dev/null
+++ b/core/src/main/scala/spark/PipedRDD.scala
@@ -0,0 +1,61 @@
+package spark
+
+import java.io.PrintWriter
+import java.util.StringTokenizer
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+
+/**
+ * An RDD that pipes the contents of each parent partition through an external command
+ * (printing them one per line) and returns the output as a collection of strings.
+ */
+class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String])
+extends RDD[String](parent.context) {
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command))
+
+ override def splits = parent.splits
+
+ override val dependencies = List(new OneToOneDependency(parent))
+
+ override def compute(split: Split): Iterator[String] = {
+ val proc = Runtime.getRuntime.exec(command.toArray)
+ val env = SparkEnv.get
+
+ // Start a thread to print the process's stderr to ours
+ new Thread("stderr reader for " + command) {
+ override def run() {
+ for(line <- Source.fromInputStream(proc.getErrorStream).getLines)
+ System.err.println(line)
+ }
+ }.start()
+
+ // Start a thread to feed the process input from our parent's iterator
+ new Thread("stdin writer for " + command) {
+ override def run() {
+ SparkEnv.set(env)
+ val out = new PrintWriter(proc.getOutputStream)
+ for(elem <- parent.iterator(split))
+ out.println(elem)
+ out.close()
+ }
+ }.start()
+
+ // Return an iterator that read lines from the process's stdout
+ Source.fromInputStream(proc.getInputStream).getLines
+ }
+}
+
+object PipedRDD {
+ // Split a string into words using a standard StringTokenizer
+ def tokenize(command: String): Seq[String] = {
+ val buf = new ArrayBuffer[String]
+ val tok = new StringTokenizer(command)
+ while(tok.hasMoreElements)
+ buf += tok.nextToken()
+ buf
+ }
+}
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 505ef76941..ee9d747de6 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -78,6 +78,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)
+ def pipe(command: String): RDD[String] =
+ new PipedRDD(this, command)
+
+ def pipe(command: Seq[String]): RDD[String] =
+ new PipedRDD(this, command)
+
// Parallel operations
def foreach(f: T => Unit) {