aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJunyang <fly.shenjy@gmail.com>2016-02-11 09:33:11 +0000
committerSean Owen <sowen@cloudera.com>2016-02-11 09:33:11 +0000
commitf9ae99fee13681e436fde9899b6a189746348ba1 (patch)
treec41f10dc6ff68e18fcb2a447ed07eaf2a1572b8a
parentc2f21d88981789fe3366f2c4040445aeff5bf083 (diff)
downloadspark-f9ae99fee13681e436fde9899b6a189746348ba1.tar.gz
spark-f9ae99fee13681e436fde9899b6a189746348ba1.tar.bz2
spark-f9ae99fee13681e436fde9899b6a189746348ba1.zip
[SPARK-13074][CORE] Add JavaSparkContext. getPersistentRDDs method
The "getPersistentRDDs()" is a useful API of SparkContext to get cached RDDs. However, the JavaSparkContext does not have this API. Add a simple getPersistentRDDs() to get java.util.Map<Integer, JavaRDD> for Java users. Author: Junyang <fly.shenjy@gmail.com> Closes #10978 from flyjy/master.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala10
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java12
2 files changed, 22 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 01433ca2ef..f1aebbcd39 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -774,6 +774,16 @@ class JavaSparkContext(val sc: SparkContext)
/** Cancel all jobs that have been scheduled or are running. */
def cancelAllJobs(): Unit = sc.cancelAllJobs()
+
+ /**
+ * Returns an Java map of JavaRDDs that have marked themselves as persistent via cache() call.
+ * Note that this does not necessarily mean the caching or computation was successful.
+ */
+ def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
+ sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s))
+ .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
+ }
+
}
object JavaSparkContext {
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 8117ad9e60..e6a4ab7550 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1811,4 +1811,16 @@ public class JavaAPISuite implements Serializable {
conf.get("spark.kryo.classesToRegister"));
}
+ @Test
+ public void testGetPersistentRDDs() {
+ java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
+ Assert.assertTrue(cachedRddsMap.isEmpty());
+ JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
+ JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
+ cachedRddsMap = sc.getPersistentRDDs();
+ Assert.assertEquals(2, cachedRddsMap.size());
+ Assert.assertEquals("RDD1", cachedRddsMap.get(0).name());
+ Assert.assertEquals("RDD2", cachedRddsMap.get(1).name());
+ }
+
}