aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala2
-rw-r--r--project/MimaExcludes.scala4
5 files changed, 54 insertions, 27 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6f3b8faf03..c17ca12379 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -19,7 +19,7 @@ package org.apache.spark.api.java
import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, Iterator => JIterator, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@@ -80,7 +80,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
- def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] =
+ def iterator(split: Partition, taskContext: TaskContext): JIterator[T] =
rdd.iterator(split, taskContext).asJava
// Transformations (return a new RDD)
@@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of the original partition.
*/
def mapPartitionsWithIndex[R](
- f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]],
+ f: JFunction2[jl.Integer, JIterator[T], JIterator[R]],
preservesPartitioning: Boolean = false): JavaRDD[R] =
new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala,
preservesPartitioning)(fakeClassTag))(fakeClassTag)
@@ -147,7 +147,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
+ def mapPartitions[U](f: FlatMapFunction[JIterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
@@ -157,7 +157,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
+ def mapPartitions[U](f: FlatMapFunction[JIterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -169,7 +169,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
}
@@ -179,7 +179,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -190,7 +190,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
+ def mapPartitionsToDouble(f: DoubleFlatMapFunction[JIterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -202,7 +202,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
- def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
+ def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[JIterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).asScala
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Applies a function f to each partition of this RDD.
*/
- def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+ def foreachPartition(f: VoidFunction[JIterator[T]]): Unit = {
rdd.foreachPartition(x => f.call(x.asJava))
}
@@ -256,19 +256,33 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: String): JavaRDD[String] = rdd.pipe(command)
+ def pipe(command: String): JavaRDD[String] = {
+ rdd.pipe(command)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String]): JavaRDD[String] =
+ def pipe(command: JList[String]): JavaRDD[String] = {
rdd.pipe(command.asScala)
+ }
/**
* Return an RDD created by piping elements to a forked external process.
*/
- def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] =
+ def pipe(command: JList[String], env: JMap[String, String]): JavaRDD[String] = {
rdd.pipe(command.asScala, env.asScala)
+ }
+
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
+ def pipe(command: JList[String],
+ env: JMap[String, String],
+ separateWorkingDir: Boolean,
+ bufferSize: Int): JavaRDD[String] = {
+ rdd.pipe(command.asScala, env.asScala, null, null, separateWorkingDir, bufferSize)
+ }
/**
* Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
@@ -288,7 +302,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def zipPartitions[U, V](
other: JavaRDDLike[U, _],
- f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
+ f: FlatMapFunction2[JIterator[T], JIterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
@@ -446,8 +460,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
- def countByValue(): java.util.Map[T, jl.Long] =
- mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[java.util.Map[T, jl.Long]]
+ def countByValue(): JMap[T, jl.Long] =
+ mapAsSerializableJavaMap(rdd.countByValue()).asInstanceOf[JMap[T, jl.Long]]
/**
* (Experimental) Approximate version of countByValue().
@@ -455,13 +469,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countByValueApprox(
timeout: Long,
confidence: Double
- ): PartialResult[java.util.Map[T, BoundedDouble]] =
+ ): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout, confidence).map(mapAsSerializableJavaMap)
/**
* (Experimental) Approximate version of countByValue().
*/
- def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] =
+ def countByValueApprox(timeout: Long): PartialResult[JMap[T, BoundedDouble]] =
rdd.countByValueApprox(timeout).map(mapAsSerializableJavaMap)
/**
@@ -596,9 +610,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
+ *
* @param comp the comparator that defines ordering
* @return the maximum of the RDD
- * */
+ */
def max(comp: Comparator[T]): T = {
rdd.max()(Ordering.comparatorToOrdering(comp))
}
@@ -606,9 +621,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
+ *
* @param comp the comparator that defines ordering
* @return the minimum of the RDD
- * */
+ */
def min(comp: Comparator[T]): T = {
rdd.min()(Ordering.comparatorToOrdering(comp))
}
@@ -684,7 +700,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* The asynchronous version of the `foreachPartition` action, which
* applies a function f to each partition of this RDD.
*/
- def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = {
+ def foreachPartitionAsync(f: VoidFunction[JIterator[T]]): JavaFutureAction[Void] = {
new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)),
{ x => null.asInstanceOf[Void] })
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index dd8e46ba0f..45616856fd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -17,9 +17,11 @@
package org.apache.spark.rdd
+import java.io.BufferedWriter
import java.io.File
import java.io.FilenameFilter
import java.io.IOException
+import java.io.OutputStreamWriter
import java.io.PrintWriter
import java.util.StringTokenizer
import java.util.concurrent.atomic.AtomicReference
@@ -45,7 +47,8 @@ private[spark] class PipedRDD[T: ClassTag](
envVars: Map[String, String],
printPipeContext: (String => Unit) => Unit,
printRDDElement: (T, String => Unit) => Unit,
- separateWorkingDir: Boolean)
+ separateWorkingDir: Boolean,
+ bufferSize: Int)
extends RDD[String](prev) {
// Similar to Runtime.exec(), if we are given a single string, split it into words
@@ -58,7 +61,7 @@ private[spark] class PipedRDD[T: ClassTag](
printRDDElement: (T, String => Unit) => Unit = null,
separateWorkingDir: Boolean = false) =
this(prev, PipedRDD.tokenize(command), envVars, printPipeContext, printRDDElement,
- separateWorkingDir)
+ separateWorkingDir, 8192)
override def getPartitions: Array[Partition] = firstParent[T].partitions
@@ -144,7 +147,8 @@ private[spark] class PipedRDD[T: ClassTag](
new Thread(s"stdin writer for $command") {
override def run(): Unit = {
TaskContext.setTaskContext(context)
- val out = new PrintWriter(proc.getOutputStream)
+ val out = new PrintWriter(new BufferedWriter(
+ new OutputStreamWriter(proc.getOutputStream), bufferSize))
try {
// scalastyle:off println
// input the pipe context firstly
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 499a8b9aa1..d85d0fff46 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -724,6 +724,7 @@ abstract class RDD[T: ClassTag](
* def printRDDElement(record:(String, Seq[String]), f:String=&gt;Unit) =
* for (e &lt;- record._2) {f(e)}
* @param separateWorkingDir Use separate working directories for each task.
+ * @param bufferSize Buffer size for the stdin writer for the piped process.
* @return the result RDD
*/
def pipe(
@@ -731,11 +732,13 @@ abstract class RDD[T: ClassTag](
env: Map[String, String] = Map(),
printPipeContext: (String => Unit) => Unit = null,
printRDDElement: (T, String => Unit) => Unit = null,
- separateWorkingDir: Boolean = false): RDD[String] = withScope {
+ separateWorkingDir: Boolean = false,
+ bufferSize: Int = 8192): RDD[String] = withScope {
new PipedRDD(this, command, env,
if (printPipeContext ne null) sc.clean(printPipeContext) else null,
if (printRDDElement ne null) sc.clean(printRDDElement) else null,
- separateWorkingDir)
+ separateWorkingDir,
+ bufferSize)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index e9cc819524..fe2058d613 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -171,7 +171,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
val collectPwd = pipedPwd.collect()
assert(collectPwd(0).contains("tasks/"))
- val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true).collect()
+ val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
// make sure symlinks were created
assert(pipedLs.length > 0)
// clean up top level tasks directory
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a5d57e1b01..b0d862d006 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -686,6 +686,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"org.apache.spark.sql.DataFrameReader.this")
) ++ Seq(
+ // SPARK-14542 configurable buffer size for pipe RDD
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.pipe"),
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.pipe")
+ ) ++ Seq(
// [SPARK-4452][Core]Shuffle data structures can starve others on the same thread for memory
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.util.collection.Spillable")
) ++ Seq(