diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-06-19 23:06:21 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-06-19 23:06:21 -0700 |
commit | 23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42 (patch) | |
tree | 47dcc2b8b4ecabd808f593e0cd443a61f7d82550 | |
parent | c62bb4091bfcec8936b74f703ae8051046ddcfb1 (diff) | |
parent | 23b1c309fbbba70999177be319525c197c9fa72b (diff) | |
download | spark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.tar.gz spark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.tar.bz2 spark-23b42af70ad1ee0bfb3bd1936bba3dc224f9eb42.zip |
Merge branch 'master' into scala-2.9
-rw-r--r-- | core/src/main/scala/spark/PipedRDD.scala | 61 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 6 |
2 files changed, 67 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala new file mode 100644 index 0000000000..7c00492986 --- /dev/null +++ b/core/src/main/scala/spark/PipedRDD.scala @@ -0,0 +1,61 @@ +package spark + +import java.io.PrintWriter +import java.util.StringTokenizer + +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + + +/** + * An RDD that pipes the contents of each parent partition through an external command + * (printing them one per line) and returns the output as a collection of strings. + */ +class PipedRDD[T: ClassManifest](parent: RDD[T], command: Seq[String]) +extends RDD[String](parent.context) { + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + def this(parent: RDD[T], command: String) = this(parent, PipedRDD.tokenize(command)) + + override def splits = parent.splits + + override val dependencies = List(new OneToOneDependency(parent)) + + override def compute(split: Split): Iterator[String] = { + val proc = Runtime.getRuntime.exec(command.toArray) + val env = SparkEnv.get + + // Start a thread to print the process's stderr to ours + new Thread("stderr reader for " + command) { + override def run() { + for(line <- Source.fromInputStream(proc.getErrorStream).getLines) + System.err.println(line) + } + }.start() + + // Start a thread to feed the process input from our parent's iterator + new Thread("stdin writer for " + command) { + override def run() { + SparkEnv.set(env) + val out = new PrintWriter(proc.getOutputStream) + for(elem <- parent.iterator(split)) + out.println(elem) + out.close() + } + }.start() + + // Return an iterator that read lines from the process's stdout + Source.fromInputStream(proc.getInputStream).getLines + } +} + +object PipedRDD { + // Split a string into words using a standard StringTokenizer + def tokenize(command: String): Seq[String] = { + val buf = new ArrayBuffer[String] + val tok = new StringTokenizer(command) + while(tok.hasMoreElements) + buf += tok.nextToken() + buf + } +} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 505ef76941..ee9d747de6 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -78,6 +78,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { def groupBy[K](func: T => K): RDD[(K, Seq[T])] = groupBy[K](func, sc.numCores) + def pipe(command: String): RDD[String] = + new PipedRDD(this, command) + + def pipe(command: Seq[String]): RDD[String] = + new PipedRDD(this, command) + // Parallel operations def foreach(f: T => Unit) { |