diff options
author | Tejas Patil <tejasp@fb.com> | 2016-06-15 12:03:00 -0700 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-06-15 12:03:00 -0700 |
commit | 279bd4aa5fddbabdb0383a3f6f0fc8d91780e092 (patch) | |
tree | 46927c108ea194cdbc8f43701a641bd1ac4b1060 /core/src | |
parent | 9b234b55d1b5e4a7c80e482b3e297bfb8b583a56 (diff) | |
download | spark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.tar.gz spark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.tar.bz2 spark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.zip |
[SPARK-15826][CORE] PipedRDD to allow configurable char encoding
## What changes were proposed in this pull request?
Link to jira which describes the problem: https://issues.apache.org/jira/browse/SPARK-15826
The fix in this PR is to allow users specify encoding in the pipe() operation. For backward compatibility,
keeping the default value to be system default.
## How was this patch tested?
Ran existing unit tests
Author: Tejas Patil <tejasp@fb.com>
Closes #13563 from tejasapatil/pipedrdd_utf8.
Diffstat (limited to 'core/src')
4 files changed, 36 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index e4ccd9f11b..a37c52cbaf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -285,6 +285,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return an RDD created by piping elements to a forked external process. + */ + def pipe(command: JList[String], + env: JMap[String, String], + separateWorkingDir: Boolean, + bufferSize: Int, + encoding: String): JavaRDD[String] = { + rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding) + } + + /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 49625b7042..02b28b72fb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -47,22 +47,10 @@ private[spark] class PipedRDD[T: ClassTag]( printPipeContext: (String => Unit) => Unit, printRDDElement: (T, String => Unit) => Unit, separateWorkingDir: Boolean, - bufferSize: Int) + bufferSize: Int, + encoding: String) extends RDD[String](prev) { - // Similar to Runtime.exec(), if we are given a single string, split it into words - // using a standard StringTokenizer (i.e. by spaces) - def this( - prev: RDD[T], - command: String, - envVars: Map[String, String] = Map(), - printPipeContext: (String => Unit) => Unit = null, - printRDDElement: (T, String => Unit) => Unit = null, - separateWorkingDir: Boolean = false) = - this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement, - separateWorkingDir, 8192) - - override def getPartitions: Array[Partition] = firstParent[T].partitions /** @@ -129,7 +117,7 @@ private[spark] class PipedRDD[T: ClassTag]( override def run(): Unit = { val err = proc.getErrorStream try { - for (line <- Source.fromInputStream(err).getLines) { + for (line <- Source.fromInputStream(err)(encoding).getLines) { // scalastyle:off println System.err.println(line) // scalastyle:on println @@ -147,7 +135,7 @@ private[spark] class PipedRDD[T: ClassTag]( override def run(): Unit = { TaskContext.setTaskContext(context) val out = new PrintWriter(new BufferedWriter( - new OutputStreamWriter(proc.getOutputStream), bufferSize)) + new OutputStreamWriter(proc.getOutputStream, encoding), bufferSize)) try { // scalastyle:off println // input the pipe context firstly @@ -171,7 +159,7 @@ private[spark] class PipedRDD[T: ClassTag]( }.start() // Return an iterator that read lines from the process's stdout - val lines = Source.fromInputStream(proc.getInputStream).getLines() + val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines new Iterator[String] { def next(): String = { if (!hasNext()) { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e251421c48..b7a5b22208 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -21,6 +21,7 @@ import java.util.Random import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer +import scala.io.Codec import scala.language.implicitConversions import scala.reflect.{classTag, ClassTag} @@ -698,14 +699,14 @@ abstract class RDD[T: ClassTag]( * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String): RDD[String] = withScope { - new PipedRDD(this, command) + pipe(command) } /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String, env: Map[String, String]): RDD[String] = withScope { - new PipedRDD(this, command, env) + pipe(command, env) } /** @@ -726,6 +727,8 @@ abstract class RDD[T: ClassTag]( * for (e <- record._2) {f(e)} * @param separateWorkingDir Use separate working directories for each task. * @param bufferSize Buffer size for the stdin writer for the piped process. + * @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with + * the piped process * @return the result RDD */ def pipe( @@ -734,12 +737,14 @@ abstract class RDD[T: ClassTag]( printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null, separateWorkingDir: Boolean = false, - bufferSize: Int = 8192): RDD[String] = withScope { + bufferSize: Int = 8192, + encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null, separateWorkingDir, - bufferSize) + bufferSize, + encoding) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index fe2058d613..27cfdc7ace 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import java.io.File import scala.collection.Map +import scala.io.Codec import scala.language.postfixOps import scala.sys.process._ import scala.util.Try @@ -207,7 +208,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } val hadoopPart1 = generateFakeHadoopPartition() - val pipedRdd = new PipedRDD(nums, "printenv " + varName) + val pipedRdd = + new PipedRDD( + nums, + PipedRDD.tokenize("printenv " + varName), + Map(), + null, + null, + false, + 4092, + Codec.defaultCharsetCodec.name) val tContext = TaskContext.empty() val rddIter = pipedRdd.compute(hadoopPart1, tContext) val arr = rddIter.toArray |