diff options
author | Sital Kedia <skedia@fb.com> | 2016-05-10 15:28:35 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-05-10 15:28:35 +0100 |
commit | a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8 (patch) | |
tree | f1d3996e25faac01f7b78ac5ff526a589597913a | |
parent | 570647267055cbe33291232b375e08fa1f5d8e7a (diff) | |
download | spark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.tar.gz spark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.tar.bz2 spark-a019e6efb71e4dce51ca91e41c3d293cf3a6ccb8.zip |
[SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…
## What changes were proposed in this pull request?
Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer.
## How was this patch tested?
Ran PipedRDDSuite tests.
Author: Sital Kedia <skedia@fb.com>
Closes #12309 from sitalkedia/bufferedPipedRDD.
5 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6f3b8faf03..c17ca12379 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,7 @@ package org.apache.spark.api.java import java.{lang => jl} import java.lang.{Iterable => JIterable} -import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ - def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] = + def iterator(split: Partition, taskContext: TaskContext): JIterator[T] = rdd.iterator(split, taskContext).asJava // Transformations (return a new RDD) @@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * of the original partition. */ def mapPartitionsWithIndex[R]( - f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]], + f: JFunction2[jl.Integer, JIterator[T], JIterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala, preservesPartitioning)(fakeClassTag))(fakeClassTag) @@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { + def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { (x: Iterator[T]) => f.call(x.asJava).asScala } @@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], + def mapPartitions[U](f: FlatMapFunction[JIterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { + def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala } @@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], + def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Applies a function f to each partition of this RDD. */ - def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { + def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = { rdd.foreachPartition(x => f.call(x.asJava)) } @@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: String): JavaRDD[String] = rdd.pipe(command) + def pipe(command: String): JavaRDD[String] = { + rdd.pipe(command) + } /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: JList[String]): JavaRDD[String] = + def pipe(command: JList[String]): JavaRDD[String] = { rdd.pipe(command.asScala) + } /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] = + def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = { rdd.pipe(command.asScala, env.asScala) + } + + /** + * Return an RDD created by piping elements to a forked external process. + */ + def pipe(command: JList[String], + env: JMap[String, String], + separateWorkingDir: Boolean, + bufferSize: Int): JavaRDD[String] = { + rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize) + } /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, @@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def zipPartitions[U, V]( other: JavaRDDLike[U, _], - f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { + f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = { def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala } @@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final * combine step happens locally on the master, equivalent to running a single reduce task. */ - def countByValue(): java.util.Map[T, jl.Long] = - mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]] + def countByValue(): JMap[T, jl.Long] = + mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]] /** * (Experimental) Approximate version of countByValue(). @@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countByValueApprox( timeout: Long, confidence: Double - ): PartialResult[java.util.Map[T, BoundedDouble]] = + ): PartialResult[JMap[T, BoundedDouble]] = rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap) /** * (Experimental) Approximate version of countByValue(). */ - def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] = + def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] = rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap) /** @@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the maximum element from this RDD as defined by the specified * Comparator[T]. + * * @param comp the comparator that defines ordering * @return the maximum of the RDD - * */ + */ def max(comp: Comparator[T]): T = { rdd.max()(Ordering.comparatorToOrdering(comp)) } @@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the minimum element from this RDD as defined by the specified * Comparator[T]. + * * @param comp the comparator that defines ordering * @return the minimum of the RDD - * */ + */ def min(comp: Comparator[T]): T = { rdd.min()(Ordering.comparatorToOrdering(comp)) } @@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * The asynchronous version of the `foreachPartition` action, which * applies a function f to each partition of this RDD. */ - def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = { + def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = { new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)), { x => null.asInstanceOf[Void] }) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index dd8e46ba0f..45616856fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.rdd +import java.io.BufferedWriter import java.io.File import java.io.FilenameFilter import java.io.IOException +import java.io.OutputStreamWriter import java.io.PrintWriter import java.util.StringTokenizer import java.util.concurrent.atomic.AtomicReference @@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag]( envVars: Map[String, String], printPipeContext: (String => Unit) => Unit, printRDDElement: (T, String => Unit) => Unit, - separateWorkingDir: Boolean) + separateWorkingDir: Boolean, + bufferSize: Int) extends RDD[String](prev) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag]( printRDDElement: (T, String => Unit) => Unit = null, separateWorkingDir: Boolean = false) = this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement, - separateWorkingDir) + separateWorkingDir, 8192) override def getPartitions: Array[Partition] = firstParent[T].partitions @@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag]( new Thread(s"stdin writer for $command") { override def run(): Unit = { TaskContext.setTaskContext(context) - val out = new PrintWriter(proc.getOutputStream) + val out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(proc.getOutputStream), bufferSize)) try { // scalastyle:off println // input the pipe context firstly diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 499a8b9aa1..d85d0fff46 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag]( * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = * for (e <- record._2) {f(e)} * @param separateWorkingDir Use separate working directories for each task. + * @param bufferSize Buffer size for the stdin writer for the piped process. * @return the result RDD */ def pipe( @@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag]( env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null, - separateWorkingDir: Boolean = false): RDD[String] = withScope { + separateWorkingDir: Boolean = false, + bufferSize: Int = 8192): RDD[String] = withScope { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null, - separateWorkingDir) + separateWorkingDir, + bufferSize) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index e9cc819524..fe2058d613 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) val collectPwd = pipedPwd.collect() assert(collectPwd(0).contains("tasks/")) - val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect() + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() // make sure symlinks were created assert(pipedLs.length > 0) // clean up top level tasks directory diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a5d57e1b01..b0d862d006 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -686,6 +686,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.sql.DataFrameReader.this") ) ++ Seq( + // SPARK-14542 configurable buffer size for pipe RDD + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe") + ) ++ Seq( // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable") ) ++ Seq( |