aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-10-24 21:14:56 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-10-24 21:14:56 -0700
commit31e92b72e31910be1694c348ab5de8b14f2df44b (patch)
treebda6766bc86cf575e74b838992f8ff81b57b0aa4
parent39f6f75588b69f07cd963c5e211045fed103695b (diff)
downloadspark-31e92b72e31910be1694c348ab5de8b14f2df44b.tar.gz
spark-31e92b72e31910be1694c348ab5de8b14f2df44b.tar.bz2
spark-31e92b72e31910be1694c348ab5de8b14f2df44b.zip
Adding Java versions and associated tests
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java21
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala6
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java33
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala23
9 files changed, 123 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index f9b6ee351a..043cb183ba 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -94,6 +94,17 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
fromRDD(srdd.coalesce(numPartitions, shuffle))
/**
+ * Return a new RDD that has exactly numPartitions partitions.
+ *
+ * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ * a shuffle to redistribute data.
+ *
+ * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ * which can avoid performing a shuffle.
+ */
+ def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions))
+
+ /**
* Return an RDD with the elements from `this` that are not in `other`.
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 268f43b4e8..39f408b8c8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -108,6 +108,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.coalesce(numPartitions, shuffle))
/**
+ * Return a new RDD that has exactly numPartitions partitions.
+ *
+ * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ * a shuffle to redistribute data.
+ *
+ * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ * which can avoid performing a shuffle.
+ */
+ def repartition(numPartitions: Int): JavaPairRDD[K, V] = fromRDD(rdd.repartition(numPartitions))
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] =
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 662990049b..3b359a8fd6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -82,6 +82,17 @@ JavaRDDLike[T, JavaRDD[T]] {
rdd.coalesce(numPartitions, shuffle)
/**
+ * Return a new RDD that has exactly numPartitions partitions.
+ *
+ * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
+ * a shuffle to redistribute data.
+ *
+ * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
+ * which can avoid performing a shuffle.
+ */
+ def repartition(numPartitions: Int): JavaRDD[T] = rdd.repartition(numPartitions)
+
+ /**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 17bc2515f2..6e88be6f6a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -268,7 +268,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that has exactly numPartitions partitions.
*
- * Used to increase or decrease the level of parallelism in this RDD. This will use
+ * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
* a shuffle to redistribute data.
*
* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 7b0bb89ab2..f38c607d65 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -473,6 +473,27 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void repartition() {
+ // Shrinking number of partitions
+ JavaRDD<Integer> in1 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 2);
+ JavaRDD<Integer> repartitioned1 = in1.repartition(4);
+ List<List<Integer>> result1 = repartitioned1.glom().collect();
+ Assert.assertEquals(4, result1.size());
+ for (List<Integer> l: result1) {
+ Assert.assertTrue(l.size() > 0);
+ }
+
+ // Growing number of partitions
+ JavaRDD<Integer> in2 = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8), 4);
+ JavaRDD<Integer> repartitioned2 = in2.repartition(2);
+ List<List<Integer>> result2 = repartitioned2.glom().collect();
+ Assert.assertEquals(2, result2.size());
+ for (List<Integer> l: result2) {
+ Assert.assertTrue(l.size() > 0);
+ }
+ }
+
+ @Test
public void persist() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
doubleRDD = doubleRDD.persist(StorageLevel.DISK_ONLY());
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index d1932b6b05..1a2aeaa879 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -94,6 +94,12 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
+
+ /**
+ * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+ * returned DStream has exactly numPartitions partitions.
+ */
+ def repartition(numPartitions: Int): JavaDStream[T] = dstream.repartition(numPartitions)
}
object JavaDStream {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 978fca33ad..faf8f36182 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -59,6 +59,12 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
+ /**
+ * Return a new DStream with an increased or decreased level of parallelism. Each RDD in the
+ * returned DStream has exactly numPartitions partitions.
+ */
+ def repartition(numPartitions: Int): JavaPairDStream[K, V] = dstream.repartition(numPartitions)
+
/** Method that generates a RDD for the given Duration */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index dc01f1e8aa..5a9836a415 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -184,6 +184,39 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void testRepartitionMorePartitions() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10),
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10));
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 2);
+ JavaDStream repartitioned = stream.repartition(4);
+ JavaTestUtils.attachTestOutputStream(repartitioned);
+ List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+ Assert.assertEquals(2, result.size());
+ for ( List<List<Integer>> rdd : result) {
+ Assert.assertEquals(4, rdd.size());
+ Assert.assertEquals(
+ 10, rdd.get(0).size() + rdd.get(1).size() + rdd.get(2).size() + rdd.get(3).size());
+ }
+ }
+
+ @Test
+ public void testRepartitionFewerPartitions() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10),
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10));
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 4);
+ JavaDStream repartitioned = stream.repartition(2);
+ JavaTestUtils.attachTestOutputStream(repartitioned);
+ List<List<List<Integer>>> result = JavaTestUtils.runStreamsWithPartitions(ssc, 2, 2);
+ Assert.assertEquals(2, result.size());
+ for ( List<List<Integer>> rdd : result) {
+ Assert.assertEquals(2, rdd.size());
+ Assert.assertEquals(10, rdd.get(0).size() + rdd.get(1).size());
+ }
+ }
+
+ @Test
public void testGlom() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 5344ae7682..780f7b823b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -62,6 +62,8 @@ trait JavaTestBase extends TestSuiteBase {
* Process all registered streams for a numBatches batches, failing if
* numExpectedOutput RDD's are not generated. Generated RDD's are collected
* and returned, represented as a list for each batch interval.
+ *
+ * Returns a list of items for each RDD.
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
@@ -72,6 +74,27 @@ trait JavaTestBase extends TestSuiteBase {
res.map(entry => out.append(new ArrayList[V](entry)))
out
}
+
+ /**
+ * Process all registered streams for a numBatches batches, failing if
+ * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+ * and returned, represented as a list for each batch interval.
+ *
+ * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
+ * representing one partition.
+ */
+ def runStreamsWithPartitions[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = {
+ implicit val cm: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
+ val out = new ArrayList[JList[JList[V]]]()
+ res.map(entry => {
+ val lists = entry.map(new ArrayList[V](_))
+ out.append(new ArrayList[JList[V]](lists))
+ })
+ out
+ }
}
object JavaTestUtils extends JavaTestBase {