aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2013-10-23 18:03:08 -0700
committerReynold Xin <rxin@apache.org>2013-10-23 18:03:08 -0700
commita098438c4824891fb44e9119e038e6e796c69741 (patch)
treec1984d0dd7f7ce12dea8df7334354a671ab2ae86
parentdadfc63b0314060876ac1787d4de72b37221139c (diff)
parent210858ac02350567666f6534ba62bf7978972814 (diff)
downloadspark-a098438c4824891fb44e9119e038e6e796c69741.tar.gz
spark-a098438c4824891fb44e9119e038e6e796c69741.tar.bz2
spark-a098438c4824891fb44e9119e038e6e796c69741.zip
Merge pull request #103 from JoshRosen/unpersist-fix
Add unpersist() to JavaDoubleRDD and JavaPairRDD. This fixes a minor inconsistency where [unpersist() was only available on JavaRDD](https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201310.mbox/%3CCE8D8748.68C0%25YannLuppo%40livenation.com%3E) and not JavaPairRDD / JavaDoubleRDD. I also added support for the new optional `blocking` argument added in 0.8. Please merge this into branch-0.8, too.
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala8
3 files changed, 34 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 5fd1fab580..f9b6ee351a 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -48,6 +48,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
*/
def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
+ */
+ def unpersist(): JavaDoubleRDD = fromRDD(srdd.unpersist())
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaDoubleRDD = fromRDD(srdd.unpersist(blocking))
+
// first() has to be overriden here in order for its return type to be Double instead of Object.
override def first(): Double = srdd.first()
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index a6518abf45..268f43b4e8 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -65,6 +65,19 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def persist(newLevel: StorageLevel): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.persist(newLevel))
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
+ */
+ def unpersist(): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist())
+
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaPairRDD[K, V] = wrapRDD(rdd.unpersist(blocking))
+
// Transformations (return a new RDD)
/**
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index eec58abdd6..662990049b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -41,9 +41,17 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ * This method blocks until all blocks are deleted.
*/
def unpersist(): JavaRDD[T] = wrapRDD(rdd.unpersist())
+ /**
+ * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+ *
+ * @param blocking Whether to block until all blocks are deleted.
+ */
+ def unpersist(blocking: Boolean): JavaRDD[T] = wrapRDD(rdd.unpersist(blocking))
+
// Transformations (return a new RDD)
/**