From 08c1a42d7d9edef02a24a3bc5045b2dce035a93b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Wed, 23 Oct 2013 22:13:49 -0700 Subject: Add a `repartition` operator. This patch adds an operator called repartition with more straightforward semantics than the current `coalesce` operator. There are a few use cases where this operator is useful: 1. If a user wants to increase the number of partitions in the RDD. This is more common now with streaming. E.g. a user is ingesting data on one node but they want to add more partitions to ensure parallelism of subsequent operations across threads or the cluster. Right now they have to call rdd.coalesce(numSplits, shuffle=true) - that's super confusing. 2. If a user has input data where the number of partitions is not known. E.g. > sc.textFile("some file").coalesce(50).... This is both vague semantically (am I growing or shrinking this RDD) but also, may not work correctly if the base RDD has fewer than 50 partitions. The new operator forces shuffles every time, so it will always produce exactly the number of new partitions. It also throws an exception rather than silently not-working if a bad input is passed. I am currently adding streaming tests (requires refactoring some of the test suite to allow testing at partition granularity), so this is not ready for merge yet. But feedback is welcome. --- streaming/src/main/scala/org/apache/spark/streaming/DStream.scala | 7 +++++++ .../org/apache/spark/streaming/api/java/JavaDStreamLike.scala | 7 +++++++ 2 files changed, 14 insertions(+) (limited to 'streaming/src/main') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 80da6bd30b..6da2261f06 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -438,6 +438,13 @@ abstract class DStream[T: ClassManifest] ( */ def glom(): DStream[Array[T]] = new GlommedDStream(this) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + /** * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala index 459695b7ca..eae517cff0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala @@ -123,6 +123,13 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T def glom(): JavaDStream[JList[T]] = new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = + new JavaDStream(dstream.repartition(numPartitions)) + /** Return the StreamingContext associated with this DStream */ def context(): StreamingContext = dstream.context() -- cgit v1.2.3