diff options
5 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6f3b8faf03..c17ca12379 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -19,7 +19,7 @@ package org.apache.spark.api.java import java.{lang => jl} import java.lang.{Iterable => JIterable} -import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ - def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] = + def iterator(split: Partition, taskContext: TaskContext): JIterator[T] = rdd.iterator(split, taskContext).asJava // Transformations (return a new RDD) @@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * of the original partition. */ def mapPartitionsWithIndex[R]( - f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]], + f: JFunction2[jl.Integer, JIterator[T], JIterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala, preservesPartitioning)(fakeClassTag))(fakeClassTag) @@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { + def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { (x: Iterator[T]) => f.call(x.asJava).asScala } @@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], + def mapPartitions[U](f: FlatMapFunction[JIterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { + def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala } @@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], + def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { (x: Iterator[T]) => f.call(x.asJava).asScala @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Applies a function f to each partition of this RDD. */ - def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { + def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = { rdd.foreachPartition(x => f.call(x.asJava)) } @@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: String): JavaRDD[String] = rdd.pipe(command) + def pipe(command: String): JavaRDD[String] = { + rdd.pipe(command) + } /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: JList[String]): JavaRDD[String] = + def pipe(command: JList[String]): JavaRDD[String] = { rdd.pipe(command.asScala) + } /** * Return an RDD created by piping elements to a forked external process. */ - def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] = + def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = { rdd.pipe(command.asScala, env.asScala) + } + + /** + * Return an RDD created by piping elements to a forked external process. + */ + def pipe(command: JList[String], + env: JMap[String, String], + separateWorkingDir: Boolean, + bufferSize: Int): JavaRDD[String] = { + rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize) + } /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, @@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def zipPartitions[U, V]( other: JavaRDDLike[U, _], - f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { + f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = { def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala } @@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final * combine step happens locally on the master, equivalent to running a single reduce task. */ - def countByValue(): java.util.Map[T, jl.Long] = - mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]] + def countByValue(): JMap[T, jl.Long] = + mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]] /** * (Experimental) Approximate version of countByValue(). @@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def countByValueApprox( timeout: Long, confidence: Double - ): PartialResult[java.util.Map[T, BoundedDouble]] = + ): PartialResult[JMap[T, BoundedDouble]] = rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap) /** * (Experimental) Approximate version of countByValue(). */ - def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] = + def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] = rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap) /** @@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the maximum element from this RDD as defined by the specified * Comparator[T]. + * * @param comp the comparator that defines ordering * @return the maximum of the RDD - * */ + */ def max(comp: Comparator[T]): T = { rdd.max()(Ordering.comparatorToOrdering(comp)) } @@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Returns the minimum element from this RDD as defined by the specified * Comparator[T]. + * * @param comp the comparator that defines ordering * @return the minimum of the RDD - * */ + */ def min(comp: Comparator[T]): T = { rdd.min()(Ordering.comparatorToOrdering(comp)) } @@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * The asynchronous version of the `foreachPartition` action, which * applies a function f to each partition of this RDD. */ - def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = { + def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = { new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)), { x => null.asInstanceOf[Void] }) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index dd8e46ba0f..45616856fd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.rdd +import java.io.BufferedWriter import java.io.File import java.io.FilenameFilter import java.io.IOException +import java.io.OutputStreamWriter import java.io.PrintWriter import java.util.StringTokenizer import java.util.concurrent.atomic.AtomicReference @@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag]( envVars: Map[String, String], printPipeContext: (String => Unit) => Unit, printRDDElement: (T, String => Unit) => Unit, - separateWorkingDir: Boolean) + separateWorkingDir: Boolean, + bufferSize: Int) extends RDD[String](prev) { // Similar to Runtime.exec(), if we are given a single string, split it into words @@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag]( printRDDElement: (T, String => Unit) => Unit = null, separateWorkingDir: Boolean = false) = this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement, - separateWorkingDir) + separateWorkingDir, 8192) override def getPartitions: Array[Partition] = firstParent[T].partitions @@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag]( new Thread(s"stdin writer for $command") { override def run(): Unit = { TaskContext.setTaskContext(context) - val out = new PrintWriter(proc.getOutputStream) + val out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(proc.getOutputStream), bufferSize)) try { // scalastyle:off println // input the pipe context firstly diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 499a8b9aa1..d85d0fff46 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag]( * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = * for (e <- record._2) {f(e)} * @param separateWorkingDir Use separate working directories for each task. + * @param bufferSize Buffer size for the stdin writer for the piped process. * @return the result RDD */ def pipe( @@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag]( env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null, - separateWorkingDir: Boolean = false): RDD[String] = withScope { + separateWorkingDir: Boolean = false, + bufferSize: Int = 8192): RDD[String] = withScope { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null, - separateWorkingDir) + separateWorkingDir, + bufferSize) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index e9cc819524..fe2058d613 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) val collectPwd = pipedPwd.collect() assert(collectPwd(0).contains("tasks/")) - val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect() + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() // make sure symlinks were created assert(pipedLs.length > 0) // clean up top level tasks directory diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index a5d57e1b01..b0d862d006 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -686,6 +686,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleMethTypeProblem]( "org.apache.spark.sql.DataFrameReader.this") ) ++ Seq( + // SPARK-14542 configurable buffer size for pipe RDD + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe") + ) ++ Seq( // [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable") ) ++ Seq( |