diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-25 14:22:23 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-10-25 14:22:23 -0700 |
commit | dc9570782a90d731152246b347996ee12cf68aa3 (patch) | |
tree | 5553b1f5446f437607b39f620bc34c349fdbe820 /streaming/src/main | |
parent | e962a6e6ee8d8ef9d1245d85616fe50554f7f689 (diff) | |
parent | ab35ec4f0f6c6892ad6457e58b1d95c9224ab5b8 (diff) | |
download | spark-dc9570782a90d731152246b347996ee12cf68aa3.tar.gz spark-dc9570782a90d731152246b347996ee12cf68aa3.tar.bz2 spark-dc9570782a90d731152246b347996ee12cf68aa3.zip |
Merge branch 'apache-master' into transform
Diffstat (limited to 'streaming/src/main')
3 files changed, 19 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala index 38e34795b4..9ceff754c4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStream.scala @@ -438,6 +438,13 @@ abstract class DStream[T: ClassManifest] ( */ def glom(): DStream[Array[T]] = new GlommedDStream(this) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): DStream[T] = this.transform(_.repartition(numPartitions)) + /** * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala index d1932b6b05..1a2aeaa879 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala @@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) + + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions) } object JavaDStream { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index 4dd6b7d096..c6cd635afa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persist the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + /** + * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the + * returned DStream has exactly numPartitions partitions. + */ + def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions) + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { |