aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-06-15 12:03:00 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-15 12:03:00 -0700
commit279bd4aa5fddbabdb0383a3f6f0fc8d91780e092 (patch)
tree46927c108ea194cdbc8f43701a641bd1ac4b1060 /core
parent9b234b55d1b5e4a7c80e482b3e297bfb8b583a56 (diff)
downloadspark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.tar.gz
spark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.tar.bz2
spark-279bd4aa5fddbabdb0383a3f6f0fc8d91780e092.zip
[SPARK-15826][CORE] PipedRDD to allow configurable char encoding
## What changes were proposed in this pull request? Link to jira which describes the problem: https://issues.apache.org/jira/browse/SPARK-15826 The fix in this PR is to allow users specify encoding in the pipe() operation. For backward compatibility, keeping the default value to be system default. ## How was this patch tested? Ran existing unit tests Author: Tejas Patil <tejasp@fb.com> Closes #13563 from tejasapatil/pipedrdd_utf8.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala12
4 files changed, 36 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index e4ccd9f11b..a37c52cbaf 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -285,6 +285,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
}
/**
+ * Return an RDD created by piping elements to a forked external process.
+ */
+ def pipe(command: JList[String],
+ env: JMap[String, String],
+ separateWorkingDir: Boolean,
+ bufferSize: Int,
+ encoding: String): JavaRDD[String] = {
+ rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize, encoding)
+ }
+
+ /**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
* second element in each RDD, etc. Assumes that the two RDDs have the *same number of
* partitions* and the *same number of elements in each partition* (e.g. one was made through
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 49625b7042..02b28b72fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -47,22 +47,10 @@ private[spark] class PipedRDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
separateWorkingDir: Boolean,
- bufferSize: Int)
+ bufferSize: Int,
+ encoding: String)
extends RDD[String](prev) {
- // Similar to Runtime.exec(), if we are given a single string, split it into words
- // using a standard StringTokenizer (i.e. by spaces)
- def this(
- prev: RDD[T],
- command: String,
- envVars: Map[String, String] = Map(),
- printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null,
- separateWorkingDir: Boolean = false) =
- this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
- separateWorkingDir, 8192)
-
-
override def getPartitions: Array[Partition] = firstParent[T].partitions
/**
@@ -129,7 +117,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
val err = proc.getErrorStream
try {
- for (line <- Source.fromInputStream(err).getLines) {
+ for (line <- Source.fromInputStream(err)(encoding).getLines) {
// scalastyle:off println
System.err.println(line)
// scalastyle:on println
@@ -147,7 +135,7 @@ private[spark] class PipedRDD[T: ClassTag](
override def run(): Unit = {
TaskContext.setTaskContext(context)
val out = new PrintWriter(new BufferedWriter(
- new OutputStreamWriter(proc.getOutputStream), bufferSize))
+ new OutputStreamWriter(proc.getOutputStream, encoding), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
@@ -171,7 +159,7 @@ private[spark] class PipedRDD[T: ClassTag](
}.start()
// Return an iterator that read lines from the process's stdout
- val lines = Source.fromInputStream(proc.getInputStream).getLines()
+ val lines = Source.fromInputStream(proc.getInputStream)(encoding).getLines
new Iterator[String] {
def next(): String = {
if (!hasNext()) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index e251421c48..b7a5b22208 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -21,6 +21,7 @@ import java.util.Random
import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
+import scala.io.Codec
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
@@ -698,14 +699,14 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
- new PipedRDD(this, command)
+ pipe(command)
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
- new PipedRDD(this, command, env)
+ pipe(command, env)
}
/**
@@ -726,6 +727,8 @@ abstract class RDD[T: ClassTag](
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
* @param bufferSize Buffer size for the stdin writer for the piped process.
+ * @param encoding Char encoding used for interacting (via stdin, stdout and stderr) with
+ * the piped process
* @return the result RDD
*/
def pipe(
@@ -734,12 +737,14 @@ abstract class RDD[T: ClassTag](
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false,
- bufferSize: Int = 8192): RDD[String] = withScope {
+ bufferSize: Int = 8192,
+ encoding: String = Codec.defaultCharsetCodec.name): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
separateWorkingDir,
- bufferSize)
+ bufferSize,
+ encoding)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index fe2058d613..27cfdc7ace 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import java.io.File
import scala.collection.Map
+import scala.io.Codec
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
@@ -207,7 +208,16 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
val hadoopPart1 = generateFakeHadoopPartition()
- val pipedRdd = new PipedRDD(nums, "printenv " + varName)
+ val pipedRdd =
+ new PipedRDD(
+ nums,
+ PipedRDD.tokenize("printenv " + varName),
+ Map(),
+ null,
+ null,
+ false,
+ 4092,
+ Codec.defaultCharsetCodec.name)
val tContext = TaskContext.empty()
val rddIter = pipedRdd.compute(hadoopPart1, tContext)
val arr = rddIter.toArray