aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-05-10 15:28:35 +0100
committerSean Owen <sowen@cloudera.com>2016-05-10 15:28:35 +0100
commita019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (patch)
treef1d3996e25faac01f7b78ac5ff526a589597913a
parent570647267055cbe33291232b375e08fa1f5d8e7a (diff)
downloadspark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.tar.gz
spark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.tar.bz2
spark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.zip
[SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…
## What changes were proposed in this pull request? Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer. ## How was this patch tested? Ran PipedRDDSuite tests. Author: Sital Kedia <skedia@fb.com> Closes #12309 from sitalkedia/bufferedPipedRDD.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala2
-rw-r--r--project/MimaExcludes.scala4
5 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6f3b8faf03..c17ca12379 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
- def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
+ def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
rdd.iterator(split, taskContext).asJava
// Transformations (return a new RDD)
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
- f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+ f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
+ def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
@@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+ def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
@@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Applies a function f to each partition of this RDD.
*/
- def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+ def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
rdd.foreachPartition(x => f.call(x.asJava))
}
@@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
+ def pipe(command: String): JavaRDD[String] = {
+ rdd.pipe(command)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String]): JavaRDD[String] =
+ def pipe(command: JList[String]): JavaRDD[String] = {
rdd.pipe(command.asScala)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
+ def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala)
+ }
+
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
+ def pipe(command: JList[String],
+ env: JMap[String, String],
+ separateWorkingDir: Boolean,
+ bufferSize: Int): JavaRDD[String] = {
+ rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
+ }
/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
@@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
- f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
+ f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
@@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
- def countByValue(): java.util.Map[T, jl.Long] =
- mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]]
+ def countByValue(): JMap[T, jl.Long] =
+ mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
/**
* (Experimental) Approximate version of countByValue().
@@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countByValueApprox(
timeout: Long,
confidence: Double
- ): PartialResult[java.util.Map[T, BoundedDouble]] =
+ ): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
/**
* (Experimental) Approximate version of countByValue().
*/
- def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
+ def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
/**
@@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
+ *
* @param comp the comparator that defines ordering
* @return the maximum of the RDD
- * */
+ */
def max(comp: Comparator[T]): T = {
rdd.max()(Ordering.comparatorToOrdering(comp))
}
@@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
+ *
* @param comp the comparator that defines ordering
* @return the minimum of the RDD
- * */
+ */
def min(comp: Comparator[T]): T = {
rdd.min()(Ordering.comparatorToOrdering(comp))
}
@@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* The asynchronous version of the `foreachPartition` action, which
* applies a function f to each partition of this RDD.
*/
- def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
+ def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
{ x => null.asInstanceOf[Void] })
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index dd8e46ba0f..45616856fd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,9 +17,11 @@
package org.apache.spark.rdd
+import java.io.BufferedWriter
import java.io.File
import java.io.FilenameFilter
import java.io.IOException
+import java.io.OutputStreamWriter
import java.io.PrintWriter
import java.util.StringTokenizer
import java.util.concurrent.atomic.AtomicReference
@@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
- separateWorkingDir: Boolean)
+ separateWorkingDir: Boolean,
+ bufferSize: Int)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
- separateWorkingDir)
+ separateWorkingDir, 8192)
override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
new Thread(s"stdin writer for $command") {
override def run(): Unit = {
TaskContext.setTaskContext(context)
- val out = new PrintWriter(proc.getOutputStream)
+ val out = new PrintWriter(new BufferedWriter(
+ new OutputStreamWriter(proc.getOutputStream), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 499a8b9aa1..d85d0fff46 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag](
* def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit) =
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
+ * @param bufferSize Buffer size for the stdin writer for the piped process.
* @return the result RDD
*/
def pipe(
@@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag](
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
- separateWorkingDir: Boolean = false): RDD[String] = withScope {
+ separateWorkingDir: Boolean = false,
+ bufferSize: Int = 8192): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
- separateWorkingDir)
+ separateWorkingDir,
+ bufferSize)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index e9cc819524..fe2058d613 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
- val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+ val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a5d57e1b01..b0d862d006 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -686,6 +686,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.sql.DataFrameReader.this")
) ++ Seq(
+ // SPARK-14542 configurable buffer size for pipe RDD
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
+ ) ++ Seq(
// [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
) ++ Seq(